sacp/
jsonrpc.rs

1//! Core JSON-RPC server support.
2
3use agent_client_protocol_schema::SessionId;
4// Re-export jsonrpcmsg for use in public API
5pub use jsonrpcmsg;
6
7// Types re-exported from crate root
8use serde::{Deserialize, Serialize};
9use std::fmt::Debug;
10use std::panic::Location;
11use std::pin::pin;
12use uuid::Uuid;
13
14use boxfnonce::SendBoxFnOnce;
15use futures::channel::{mpsc, oneshot};
16use futures::future::{self, BoxFuture, Either};
17use futures::{AsyncRead, AsyncWrite, StreamExt};
18
19mod dynamic_handler;
20pub(crate) mod handlers;
21mod incoming_actor;
22mod outgoing_actor;
23pub(crate) mod responder;
24mod task_actor;
25mod transport_actor;
26
27use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
28pub use crate::jsonrpc::handlers::NullHandler;
29use crate::jsonrpc::handlers::{ChainedHandler, NamedHandler};
30use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, RequestHandler};
31use crate::jsonrpc::outgoing_actor::{OutgoingMessageTx, send_raw_message};
32use crate::jsonrpc::responder::SpawnedResponder;
33use crate::jsonrpc::responder::{ChainResponder, JrResponder, NullResponder};
34use crate::jsonrpc::task_actor::{Task, TaskTx};
35use crate::link::{HasDefaultPeer, HasPeer, JrLink};
36use crate::mcp_server::McpServer;
37use crate::peer::JrPeer;
38use crate::{AgentPeer, ClientPeer, Component};
39
40/// Handlers process incoming JSON-RPC messages on a [`JrConnection`].
41///
42/// When messages arrive, they flow through a chain of handlers. Each handler can
43/// either **claim** the message (handle it) or **decline** it (pass to the next handler).
44///
45/// # Message Flow
46///
47/// Messages flow through three layers of handlers in order:
48///
49/// ```text
50/// ┌─────────────────────────────────────────────────────────────────┐
51/// │                     Incoming Message                            │
52/// └─────────────────────────────────────────────────────────────────┘
53///                              │
54///                              ▼
55/// ┌─────────────────────────────────────────────────────────────────┐
56/// │  1. User Handlers (registered via on_receive_request, etc.)     │
57/// │     - Tried in registration order                               │
58/// │     - First handler to return Handled::Yes claims the message   │
59/// └─────────────────────────────────────────────────────────────────┘
60///                              │ Handled::No
61///                              ▼
62/// ┌─────────────────────────────────────────────────────────────────┐
63/// │  2. Dynamic Handlers (added at runtime)                         │
64/// │     - Used for session-specific message handling                │
65/// │     - Added via JrConnectionCx::add_dynamic_handler             │
66/// └─────────────────────────────────────────────────────────────────┘
67///                              │ Handled::No
68///                              ▼
69/// ┌─────────────────────────────────────────────────────────────────┐
70/// │  3. Role Default Handler                                        │
71/// │     - Fallback based on the connection's JrLink                 │
72/// │     - Handles protocol-level messages (e.g., proxy forwarding)  │
73/// └─────────────────────────────────────────────────────────────────┘
74///                              │ Handled::No
75///                              ▼
76/// ┌─────────────────────────────────────────────────────────────────┐
77/// │  Unhandled: Error response sent (or queued if retry=true)       │
78/// └─────────────────────────────────────────────────────────────────┘
79/// ```
80///
81/// # The `Handled` Return Value
82///
83/// Each handler returns [`Handled`] to indicate whether it processed the message:
84///
85/// - **`Handled::Yes`** - Message was handled. No further handlers are invoked.
86/// - **`Handled::No { message, retry }`** - Message was not handled. The message
87///   (possibly modified) is passed to the next handler in the chain.
88///
89/// For convenience, handlers can return `()` which is equivalent to `Handled::Yes`.
90///
91/// # The Retry Mechanism
92///
93/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
94///
95/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
96/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
97///
98/// This mechanism exists because of a timing issue with sessions: when a `session/new`
99/// response is being processed, the dynamic handler for that session hasn't been registered
100/// yet, but `session/update` notifications for that session may already be arriving.
101/// By setting `retry: true`, these early notifications are queued until the session's
102/// dynamic handler is added.
103///
104/// # Handler Registration
105///
106/// Most users register handlers using the builder methods on [`JrConnectionBuilder`]:
107///
108/// ```
109/// # use sacp::{AgentToClient, ClientToAgent, Component};
110/// # use sacp::schema::{InitializeRequest, InitializeResponse, AgentCapabilities};
111/// # use sacp_test::StatusUpdate;
112/// # async fn example(transport: impl Component<ClientToAgent>) -> Result<(), sacp::Error> {
113/// AgentToClient::builder()
114///     .on_receive_request(async |req: InitializeRequest, request_cx, cx| {
115///         request_cx.respond(
116///             InitializeResponse::new(req.protocol_version)
117///                 .agent_capabilities(AgentCapabilities::new()),
118///         )
119///     }, sacp::on_receive_request!())
120///     .on_receive_notification(async |notif: StatusUpdate, cx| {
121///         // Process notification
122///         Ok(())
123///     }, sacp::on_receive_notification!())
124///     .serve(transport)
125///     .await?;
126/// # Ok(())
127/// # }
128/// ```
129///
130/// The type parameter on the closure determines which messages are dispatched to it.
131/// Messages that don't match the type are automatically passed to the next handler.
132///
133/// # Implementing Custom Handlers
134///
135/// For advanced use cases, you can implement `JrMessageHandler` directly:
136///
137/// ```ignore
138/// struct MyHandler;
139///
140/// impl JrMessageHandler for MyHandler {
141///     type Link = ClientToAgent;
142///
143///     async fn handle_message(
144///         &mut self,
145///         message: MessageCx,
146///         cx: JrConnectionCx<Self::Role>,
147///     ) -> Result<Handled<MessageCx>, Error> {
148///         if message.method() == "my/custom/method" {
149///             // Handle it
150///             Ok(Handled::Yes)
151///         } else {
152///             // Pass to next handler
153///             Ok(Handled::No { message, retry: false })
154///         }
155///     }
156///
157///     fn describe_chain(&self) -> impl std::fmt::Debug {
158///         "MyHandler"
159///     }
160/// }
161/// ```
162///
163/// # Important: Handlers Must Not Block
164///
165/// The connection processes messages on a single async task. While a handler is running,
166/// no other messages can be processed. For expensive operations, use [`JrConnectionCx::spawn`]
167/// to run work concurrently:
168///
169/// ```
170/// # use sacp::{ClientToAgent, AgentToClient, Component};
171/// # use sacp_test::{expensive_operation, ProcessComplete};
172/// # async fn example(transport: impl Component<AgentToClient>) -> Result<(), sacp::Error> {
173/// # ClientToAgent::builder().run_until(transport, async |cx| {
174/// cx.spawn({
175///     let connection_cx = cx.clone();
176///     async move {
177///         let result = expensive_operation("data").await?;
178///         connection_cx.send_notification(ProcessComplete { result })?;
179///         Ok(())
180///     }
181/// })?;
182/// # Ok(())
183/// # }).await?;
184/// # Ok(())
185/// # }
186/// ```
187#[allow(async_fn_in_trait)]
188/// A handler for incoming JSON-RPC messages.
189///
190/// This trait is implemented by types that can process incoming messages on a connection.
191/// Handlers are registered with a [`JrConnectionBuilder`] and are called in order until
192/// one claims the message.
193pub trait JrMessageHandler: Send {
194    /// The role type for this handler's connection.
195    type Link: JrLink;
196
197    /// Attempt to claim an incoming message (request or notification).
198    ///
199    /// # Important: do not block
200    ///
201    /// The server will not process new messages until this handler returns.
202    /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
203    /// The recommended approach to manage expensive operations is to the [`JrConnectionCx::spawn`] method available on the message context.
204    ///
205    /// # Parameters
206    ///
207    /// * `message` - The incoming message to handle.
208    /// * `cx` - The connection context, used to send messages and access connection state.
209    ///
210    /// # Returns
211    ///
212    /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
213    /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
214    /// * `Err` if an internal error occurs (this will bring down the server).
215    fn handle_message(
216        &mut self,
217        message: MessageCx,
218        cx: JrConnectionCx<Self::Link>,
219    ) -> impl Future<Output = Result<Handled<MessageCx>, crate::Error>> + Send;
220
221    /// Returns a debug description of the registered handlers for diagnostics.
222    fn describe_chain(&self) -> impl std::fmt::Debug;
223}
224
225impl<H: JrMessageHandler> JrMessageHandler for &mut H {
226    type Link = H::Link;
227
228    fn handle_message(
229        &mut self,
230        message: MessageCx,
231        cx: JrConnectionCx<Self::Link>,
232    ) -> impl Future<Output = Result<Handled<MessageCx>, crate::Error>> + Send {
233        H::handle_message(self, message, cx)
234    }
235
236    fn describe_chain(&self) -> impl std::fmt::Debug {
237        H::describe_chain(self)
238    }
239}
240
241/// A JSON-RPC connection that can act as either a server, client, or both.
242///
243/// `JrConnection` provides a builder-style API for creating JSON-RPC servers and clients.
244/// You start by calling `Link::builder()` (e.g., `ClientToAgent::builder()`), then add message
245/// handlers, and finally drive the connection with either [`serve`](JrConnectionBuilder::serve)
246/// or [`run_until`](JrConnectionBuilder::run_until), providing a component implementation
247/// (e.g., [`ByteStreams`] for byte streams).
248///
249/// # JSON-RPC Primer
250///
251/// JSON-RPC 2.0 has two fundamental message types:
252///
253/// * **Requests** - Messages that expect a response. They have an `id` field that gets
254///   echoed back in the response so the sender can correlate them.
255/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
256///   expect or receive a response.
257///
258/// # Type-Driven Message Dispatch
259///
260/// The handler registration methods use Rust's type system to determine which messages
261/// to handle. The type parameter you provide controls what gets dispatched to your handler:
262///
263/// ## Single Message Types
264///
265/// The simplest case - handle one specific message type:
266///
267/// ```no_run
268/// # use sacp_test::*;
269/// # use sacp::schema::{InitializeRequest, InitializeResponse, SessionNotification};
270/// # async fn example() -> Result<(), sacp::Error> {
271/// # let connection = mock_connection();
272/// connection
273///     .on_receive_request(async |req: InitializeRequest, request_cx, cx| {
274///         // Handle only InitializeRequest messages
275///         request_cx.respond(InitializeResponse::make())
276///     }, sacp::on_receive_request!())
277///     .on_receive_notification(async |notif: SessionNotification, cx| {
278///         // Handle only SessionUpdate notifications
279///         Ok(())
280///     }, sacp::on_receive_notification!())
281/// # .serve(sacp_test::MockTransport).await?;
282/// # Ok(())
283/// # }
284/// ```
285///
286/// ## Enum Message Types
287///
288/// You can also handle multiple related messages with a single handler by defining an enum
289/// that implements the appropriate trait ([`JrRequest`] or [`JrNotification`]):
290///
291/// ```no_run
292/// # use sacp_test::*;
293/// # use sacp::{JrRequest, JrMessage, UntypedMessage};
294/// # use sacp::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
295/// # async fn example() -> Result<(), sacp::Error> {
296/// # let connection = mock_connection();
297/// // Define an enum for multiple request types
298/// #[derive(Debug, Clone)]
299/// enum MyRequests {
300///     Initialize(InitializeRequest),
301///     Prompt(PromptRequest),
302/// }
303///
304/// // Implement JrRequest for your enum
305/// # impl JrMessage for MyRequests {
306/// #     fn method(&self) -> &str { "myRequests" }
307/// #     fn to_untyped_message(&self) -> Result<UntypedMessage, sacp::Error> { todo!() }
308/// #     fn parse_message(_method: &str, _params: &impl serde::Serialize) -> Option<Result<Self, sacp::Error>> { None }
309/// # }
310/// impl JrRequest for MyRequests { type Response = serde_json::Value; }
311///
312/// // Handle all variants in one place
313/// connection.on_receive_request(async |req: MyRequests, request_cx, cx| {
314///     match req {
315///         MyRequests::Initialize(init) => { request_cx.respond(serde_json::json!({})) }
316///         MyRequests::Prompt(prompt) => { request_cx.respond(serde_json::json!({})) }
317///     }
318/// }, sacp::on_receive_request!())
319/// # .serve(sacp_test::MockTransport).await?;
320/// # Ok(())
321/// # }
322/// ```
323///
324/// ## Mixed Message Types
325///
326/// For enums containing both requests AND notifications, use [`on_receive_message`](Self::on_receive_message):
327///
328/// ```no_run
329/// # use sacp_test::*;
330/// # use sacp::MessageCx;
331/// # use sacp::schema::{InitializeRequest, InitializeResponse, SessionNotification};
332/// # async fn example() -> Result<(), sacp::Error> {
333/// # let connection = mock_connection();
334/// // on_receive_message receives MessageCx which can be either a request or notification
335/// connection.on_receive_message(async |msg: MessageCx<InitializeRequest, SessionNotification>, _cx| {
336///     match msg {
337///         MessageCx::Request(req, request_cx) => {
338///             request_cx.respond(InitializeResponse::make())
339///         }
340///         MessageCx::Notification(notif) => {
341///             Ok(())
342///         }
343///     }
344/// }, sacp::on_receive_message!())
345/// # .serve(sacp_test::MockTransport).await?;
346/// # Ok(())
347/// # }
348/// ```
349///
350/// # Handler Registration
351///
352/// Register handlers using these methods (listed from most common to most flexible):
353///
354/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
355/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
356/// * [`on_receive_message`](Self::on_receive_message) - Handle enums containing both requests and notifications
357/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
358///
359/// ## Handler Ordering
360///
361/// Handlers are tried in the order you register them. The first handler that claims a message
362/// (by matching its type) will process it. Subsequent handlers won't see that message:
363///
364/// ```no_run
365/// # use sacp_test::*;
366/// # use sacp::{MessageCx, UntypedMessage};
367/// # use sacp::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
368/// # async fn example() -> Result<(), sacp::Error> {
369/// # let connection = mock_connection();
370/// connection
371///     .on_receive_request(async |req: InitializeRequest, request_cx, cx| {
372///         // This runs first for InitializeRequest
373///         request_cx.respond(InitializeResponse::make())
374///     }, sacp::on_receive_request!())
375///     .on_receive_request(async |req: PromptRequest, request_cx, cx| {
376///         // This runs first for PromptRequest
377///         request_cx.respond(PromptResponse::make())
378///     }, sacp::on_receive_request!())
379///     .on_receive_message(async |msg: MessageCx, cx| {
380///         // This runs for any message not handled above
381///         msg.respond_with_error(sacp::util::internal_error("unknown method"), cx)
382///     }, sacp::on_receive_message!())
383/// # .serve(sacp_test::MockTransport).await?;
384/// # Ok(())
385/// # }
386/// ```
387///
388/// # Event Loop and Concurrency
389///
390/// Understanding the event loop is critical for writing correct handlers.
391///
392/// ## The Event Loop
393///
394/// `JrConnection` runs all handler callbacks on a single async task - the event loop.
395/// While a handler is running, **the server cannot receive new messages**. This means
396/// any blocking or expensive work in your handlers will stall the entire connection.
397///
398/// To avoid blocking the event loop, use [`JrConnectionCx::spawn`] to offload serious
399/// work to concurrent tasks:
400///
401/// ```no_run
402/// # use sacp_test::*;
403/// # async fn example() -> Result<(), sacp::Error> {
404/// # let connection = mock_connection();
405/// connection.on_receive_request(async |req: AnalyzeRequest, request_cx, cx| {
406///     // Clone cx for the spawned task
407///     cx.spawn({
408///         let connection_cx = cx.clone();
409///         async move {
410///             let result = expensive_analysis(&req.data).await?;
411///             connection_cx.send_notification(AnalysisComplete { result })?;
412///             Ok(())
413///         }
414///     })?;
415///
416///     // Respond immediately without blocking
417///     request_cx.respond(AnalysisStarted { job_id: 42 })
418/// }, sacp::on_receive_request!())
419/// # .serve(sacp_test::MockTransport).await?;
420/// # Ok(())
421/// # }
422/// ```
423///
424/// Note that the entire connection runs within one async task, so parallelism must be
425/// managed explicitly using [`spawn`](JrConnectionCx::spawn).
426///
427/// ## The Connection Context
428///
429/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
430///
431/// * **For request handlers** - [`JrRequestCx<R>`] provides [`respond`](JrRequestCx::respond)
432///   to send the response, plus methods to send other messages
433/// * **For notification handlers** - [`JrConnectionCx`] provides methods to send messages
434///   and spawn tasks
435///
436/// Both context types support:
437/// * [`send_request`](JrConnectionCx::send_request) - Send requests to the other side
438/// * [`send_notification`](JrConnectionCx::send_notification) - Send notifications
439/// * [`spawn`](JrConnectionCx::spawn) - Run tasks concurrently without blocking the event loop
440///
441/// The [`JrResponse`] returned by `send_request` provides methods like
442/// [`on_receiving_result`](JrResponse::on_receiving_result) that help you
443/// avoid accidentally blocking the event loop while waiting for responses.
444///
445/// # Driving the Connection
446///
447/// After adding handlers, you must drive the connection using one of two modes:
448///
449/// ## Server Mode: `serve()`
450///
451/// Use [`serve`](Self::serve) when you only need to respond to incoming messages:
452///
453/// ```no_run
454/// # use sacp_test::*;
455/// # async fn example() -> Result<(), sacp::Error> {
456/// # let connection = mock_connection();
457/// connection
458///     .on_receive_request(async |req: MyRequest, request_cx, cx| {
459///         request_cx.respond(MyResponse { status: "ok".into() })
460///     }, sacp::on_receive_request!())
461///     .serve(MockTransport)  // Runs until connection closes or error occurs
462///     .await?;
463/// # Ok(())
464/// # }
465/// ```
466///
467/// The connection will process incoming messages and invoke your handlers until the
468/// connection is closed or an error occurs.
469///
470/// ## Client Mode: `run_until()`
471///
472/// Use [`run_until`](Self::run_until) when you need to both handle incoming messages
473/// AND send your own requests/notifications:
474///
475/// ```no_run
476/// # use sacp_test::*;
477/// # use sacp::schema::InitializeRequest;
478/// # async fn example() -> Result<(), sacp::Error> {
479/// # let connection = mock_connection();
480/// connection
481///     .on_receive_request(async |req: MyRequest, request_cx, cx| {
482///         request_cx.respond(MyResponse { status: "ok".into() })
483///     }, sacp::on_receive_request!())
484///     .run_until(MockTransport, async |cx| {
485///         // You can send requests to the other side
486///         let response = cx.send_request(InitializeRequest::make())
487///             .block_task()
488///             .await?;
489///
490///         // And send notifications
491///         cx.send_notification(StatusUpdate { message: "ready".into() })?;
492///
493///         Ok(())
494///     })
495///     .await?;
496/// # Ok(())
497/// # }
498/// ```
499///
500/// The connection will serve incoming messages in the background while your client closure
501/// runs. When the closure returns, the connection shuts down.
502///
503/// # Example: Complete Agent
504///
505/// ```no_run
506/// # use sacp::link::UntypedLink;
507/// # use sacp::{JrConnectionBuilder};
508/// # use sacp::ByteStreams;
509/// # use sacp::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
510/// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
511/// # async fn example() -> Result<(), sacp::Error> {
512/// let transport = ByteStreams::new(
513///     tokio::io::stdout().compat_write(),
514///     tokio::io::stdin().compat(),
515/// );
516///
517/// UntypedLink::builder()
518///     .name("my-agent")  // Optional: for debugging logs
519///     .on_receive_request(async |init: InitializeRequest, request_cx, cx| {
520///         let response: InitializeResponse = todo!();
521///         request_cx.respond(response)
522///     }, sacp::on_receive_request!())
523///     .on_receive_request(async |prompt: PromptRequest, request_cx, cx| {
524///         // You can send notifications while processing a request
525///         let notif: SessionNotification = todo!();
526///         cx.send_notification(notif)?;
527///
528///         // Then respond to the request
529///         let response: PromptResponse = todo!();
530///         request_cx.respond(response)
531///     }, sacp::on_receive_request!())
532///     .serve(transport)
533///     .await?;
534/// # Ok(())
535/// # }
536/// ```
537#[must_use]
538pub struct JrConnectionBuilder<H: JrMessageHandler, R: JrResponder<H::Link> = NullResponder> {
539    name: Option<String>,
540
541    /// Handler for incoming messages.
542    handler: H,
543
544    /// Responder for background tasks.
545    responder: R,
546}
547
548impl<Link: JrLink> JrConnectionBuilder<NullHandler<Link>, NullResponder> {
549    /// Create a new JrConnection with the given role.
550    /// This type follows a builder pattern; use other methods to configure and then invoke
551    /// [`Self::serve`] (to use as a server) or [`Self::run_until`] to use as a client.
552    pub(crate) fn new(role: Link) -> Self {
553        Self {
554            name: Default::default(),
555            handler: NullHandler::new(role),
556            responder: NullResponder,
557        }
558    }
559}
560
561impl<H: JrMessageHandler> JrConnectionBuilder<H, NullResponder> {
562    /// Create a new connection builder with the given handler.
563    pub fn new_with(handler: H) -> Self {
564        Self {
565            name: Default::default(),
566            handler,
567            responder: NullResponder,
568        }
569    }
570}
571
572impl<H: JrMessageHandler, R: JrResponder<H::Link>> JrConnectionBuilder<H, R> {
573    /// Set the "name" of this connection -- used only for debugging logs.
574    pub fn name(mut self, name: impl ToString) -> Self {
575        self.name = Some(name.to_string());
576        self
577    }
578
579    /// Merge another [`JrConnectionBuilder`] into this one.
580    ///
581    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
582    /// This is a low-level method that is not intended for general use.
583    pub fn with_connection_builder<H1, R1>(
584        self,
585        other: JrConnectionBuilder<H1, R1>,
586    ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, impl JrResponder<H::Link>>
587    where
588        H1: JrMessageHandler<Link = H::Link>,
589        R1: JrResponder<H::Link>,
590    {
591        JrConnectionBuilder {
592            name: self.name,
593            handler: ChainedHandler::new(
594                self.handler,
595                NamedHandler::new(other.name, other.handler),
596            ),
597            responder: ChainResponder::new(self.responder, other.responder),
598        }
599    }
600
601    /// Add a new [`JrMessageHandler`] to the chain.
602    ///
603    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
604    /// This is a low-level method that is not intended for general use.
605    pub fn with_handler<H1>(
606        self,
607        handler: H1,
608    ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
609    where
610        H1: JrMessageHandler<Link = H::Link>,
611    {
612        JrConnectionBuilder {
613            name: self.name,
614            handler: ChainedHandler::new(self.handler, handler),
615            responder: self.responder,
616        }
617    }
618
619    /// Add a new [`JrResponder`] to the chain.
620    pub fn with_responder<R1>(
621        self,
622        responder: R1,
623    ) -> JrConnectionBuilder<H, impl JrResponder<H::Link>>
624    where
625        R1: JrResponder<H::Link>,
626    {
627        JrConnectionBuilder {
628            name: self.name,
629            handler: self.handler,
630            responder: ChainResponder::new(self.responder, responder),
631        }
632    }
633
634    /// Enqueue a task to run once the connection is actively serving traffic.
635    #[track_caller]
636    pub fn with_spawned<F, Fut>(self, task: F) -> JrConnectionBuilder<H, impl JrResponder<H::Link>>
637    where
638        F: FnOnce(JrConnectionCx<H::Link>) -> Fut + Send,
639        Fut: Future<Output = Result<(), crate::Error>> + Send,
640    {
641        let location = Location::caller();
642        self.with_responder(SpawnedResponder::new(location, task))
643    }
644
645    /// Register a handler for messages that can be either requests OR notifications.
646    ///
647    /// Use this when you want to handle an enum type that contains both request and
648    /// notification variants. Your handler receives a [`MessageCx<Req, Notif>`] which
649    /// is an enum with two variants:
650    ///
651    /// - `MessageCx::Request(request, request_cx)` - A request with its response context
652    /// - `MessageCx::Notification(notification)` - A notification
653    ///
654    /// # Example
655    ///
656    /// ```no_run
657    /// # use sacp_test::*;
658    /// # use sacp::MessageCx;
659    /// # async fn example() -> Result<(), sacp::Error> {
660    /// # let connection = mock_connection();
661    /// connection.on_receive_message(async |message: MessageCx<MyRequest, StatusUpdate>, _cx| {
662    ///     match message {
663    ///         MessageCx::Request(req, request_cx) => {
664    ///             // Handle request and send response
665    ///             request_cx.respond(MyResponse { status: "ok".into() })
666    ///         }
667    ///         MessageCx::Notification(notif) => {
668    ///             // Handle notification (no response needed)
669    ///             Ok(())
670    ///         }
671    ///     }
672    /// }, sacp::on_receive_message!())
673    /// # .serve(sacp_test::MockTransport).await?;
674    /// # Ok(())
675    /// # }
676    /// ```
677    ///
678    /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
679    /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
680    /// for handling requests or notifications separately.
681    ///
682    /// # Ordering
683    ///
684    /// This callback runs inside the dispatch loop and blocks further message processing
685    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
686    /// ordering guarantees and how to avoid deadlocks.
687    pub fn on_receive_message<Req, Notif, F, T, ToFut>(
688        self,
689        op: F,
690        to_future_hack: ToFut,
691    ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
692    where
693        H::Link: HasDefaultPeer,
694        Req: JrRequest,
695        Notif: JrNotification,
696        F: AsyncFnMut(MessageCx<Req, Notif>, JrConnectionCx<H::Link>) -> Result<T, crate::Error>
697            + Send,
698        T: IntoHandled<MessageCx<Req, Notif>>,
699        ToFut: Fn(
700                &mut F,
701                MessageCx<Req, Notif>,
702                JrConnectionCx<H::Link>,
703            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
704            + Send
705            + Sync,
706    {
707        self.with_handler(MessageHandler::new(
708            <H::Link as HasDefaultPeer>::DefaultPeer::default(),
709            <H::Link>::default(),
710            op,
711            to_future_hack,
712        ))
713    }
714
715    /// Register a handler for JSON-RPC requests of type `Req`.
716    ///
717    /// Your handler receives two arguments:
718    /// 1. The request (type `Req`)
719    /// 2. A [`JrRequestCx<R, Req::Response>`] for sending the response
720    ///
721    /// The request context allows you to:
722    /// - Send the response with [`JrRequestCx::respond`]
723    /// - Send notifications to the client with [`JrConnectionCx::send_notification`]
724    /// - Send requests to the client with [`JrConnectionCx::send_request`]
725    ///
726    /// # Example
727    ///
728    /// ```ignore
729    /// # use sacp::link::UntypedLink;
730    /// # use sacp::{JrConnectionBuilder};
731    /// # use sacp::schema::{PromptRequest, PromptResponse, SessionNotification};
732    /// # fn example(connection: JrConnectionBuilder<impl sacp::JrMessageHandler<Link = UntypedLink>>) {
733    /// connection.on_receive_request(async |request: PromptRequest, request_cx, cx| {
734    ///     // Send a notification while processing
735    ///     let notif: SessionNotification = todo!();
736    ///     cx.send_notification(notif)?;
737    ///
738    ///     // Do some work...
739    ///     let result = todo!("process the prompt");
740    ///
741    ///     // Send the response
742    ///     let response: PromptResponse = todo!();
743    ///     request_cx.respond(response)
744    /// }, sacp::on_receive_request!());
745    /// # }
746    /// ```
747    ///
748    /// # Type Parameter
749    ///
750    /// `Req` can be either a single request type or an enum of multiple request types.
751    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
752    ///
753    /// # Ordering
754    ///
755    /// This callback runs inside the dispatch loop and blocks further message processing
756    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
757    /// ordering guarantees and how to avoid deadlocks.
758    pub fn on_receive_request<Req: JrRequest, F, T, ToFut>(
759        self,
760        op: F,
761        to_future_hack: ToFut,
762    ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
763    where
764        H::Link: HasDefaultPeer,
765        F: AsyncFnMut(
766                Req,
767                JrRequestCx<Req::Response>,
768                JrConnectionCx<H::Link>,
769            ) -> Result<T, crate::Error>
770            + Send,
771        T: IntoHandled<(Req, JrRequestCx<Req::Response>)>,
772        ToFut: Fn(
773                &mut F,
774                Req,
775                JrRequestCx<Req::Response>,
776                JrConnectionCx<H::Link>,
777            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
778            + Send
779            + Sync,
780    {
781        self.with_handler(RequestHandler::new(
782            <H::Link as HasDefaultPeer>::DefaultPeer::default(),
783            <H::Link>::default(),
784            op,
785            to_future_hack,
786        ))
787    }
788
789    /// Register a handler for JSON-RPC notifications of type `Notif`.
790    ///
791    /// Notifications are fire-and-forget messages that don't expect a response.
792    /// Your handler receives:
793    /// 1. The notification (type `Notif`)
794    /// 2. A [`JrConnectionCx<R>`] for sending messages to the other side
795    ///
796    /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
797    /// but you can still send your own requests and notifications using the context.
798    ///
799    /// # Example
800    ///
801    /// ```no_run
802    /// # use sacp_test::*;
803    /// # async fn example() -> Result<(), sacp::Error> {
804    /// # let connection = mock_connection();
805    /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
806    ///     // Process the notification
807    ///     update_session_state(&notif)?;
808    ///
809    ///     // Optionally send a notification back
810    ///     cx.send_notification(StatusUpdate {
811    ///         message: "Acknowledged".into(),
812    ///     })?;
813    ///
814    ///     Ok(())
815    /// }, sacp::on_receive_notification!())
816    /// # .serve(sacp_test::MockTransport).await?;
817    /// # Ok(())
818    /// # }
819    /// ```
820    ///
821    /// # Type Parameter
822    ///
823    /// `Notif` can be either a single notification type or an enum of multiple notification types.
824    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
825    ///
826    /// # Ordering
827    ///
828    /// This callback runs inside the dispatch loop and blocks further message processing
829    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
830    /// ordering guarantees and how to avoid deadlocks.
831    pub fn on_receive_notification<Notif, F, T, ToFut>(
832        self,
833        op: F,
834        to_future_hack: ToFut,
835    ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
836    where
837        H::Link: HasDefaultPeer,
838        Notif: JrNotification,
839        F: AsyncFnMut(Notif, JrConnectionCx<H::Link>) -> Result<T, crate::Error> + Send,
840        T: IntoHandled<(Notif, JrConnectionCx<H::Link>)>,
841        ToFut: Fn(
842                &mut F,
843                Notif,
844                JrConnectionCx<H::Link>,
845            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
846            + Send
847            + Sync,
848    {
849        self.with_handler(NotificationHandler::new(
850            <H::Link as HasDefaultPeer>::DefaultPeer::default(),
851            <H::Link>::default(),
852            op,
853            to_future_hack,
854        ))
855    }
856
857    /// Register a handler for messages from a specific peer.
858    ///
859    /// This is similar to [`on_receive_message`](Self::on_receive_message), but allows
860    /// specifying the source peer explicitly. This is useful when receiving messages
861    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorMessage`
862    /// envelopes when receiving from an agent via a proxy).
863    ///
864    /// For the common case of receiving from the default counterpart, use
865    /// [`on_receive_message`](Self::on_receive_message) instead.
866    ///
867    /// # Ordering
868    ///
869    /// This callback runs inside the dispatch loop and blocks further message processing
870    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
871    /// ordering guarantees and how to avoid deadlocks.
872    pub fn on_receive_message_from<
873        Req: JrRequest,
874        Notif: JrNotification,
875        Peer: JrPeer,
876        F,
877        T,
878        ToFut,
879    >(
880        self,
881        peer: Peer,
882        op: F,
883        to_future_hack: ToFut,
884    ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
885    where
886        H::Link: HasPeer<Peer>,
887        F: AsyncFnMut(MessageCx<Req, Notif>, JrConnectionCx<H::Link>) -> Result<T, crate::Error>
888            + Send,
889        T: IntoHandled<MessageCx<Req, Notif>>,
890        ToFut: Fn(
891                &mut F,
892                MessageCx<Req, Notif>,
893                JrConnectionCx<H::Link>,
894            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
895            + Send
896            + Sync,
897    {
898        self.with_handler(MessageHandler::new(
899            peer,
900            <H::Link>::default(),
901            op,
902            to_future_hack,
903        ))
904    }
905
906    /// Register a handler for JSON-RPC requests from a specific peer.
907    ///
908    /// This is similar to [`on_receive_request`](Self::on_receive_request), but allows
909    /// specifying the source peer explicitly. This is useful when receiving messages
910    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorRequest`
911    /// envelopes when receiving from an agent via a proxy).
912    ///
913    /// For the common case of receiving from the default counterpart, use
914    /// [`on_receive_request`](Self::on_receive_request) instead.
915    ///
916    /// # Example
917    ///
918    /// ```ignore
919    /// use sacp::Agent;
920    /// use sacp::schema::InitializeRequest;
921    ///
922    /// // Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
923    /// connection.on_receive_request_from(Agent, async |req: InitializeRequest, request_cx, cx| {
924    ///     // Handle the request
925    ///     request_cx.respond(InitializeResponse::make())
926    /// })
927    /// ```
928    ///
929    /// # Ordering
930    ///
931    /// This callback runs inside the dispatch loop and blocks further message processing
932    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
933    /// ordering guarantees and how to avoid deadlocks.
934    pub fn on_receive_request_from<Req: JrRequest, Peer: JrPeer, F, T, ToFut>(
935        self,
936        peer: Peer,
937        op: F,
938        to_future_hack: ToFut,
939    ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
940    where
941        H::Link: HasPeer<Peer>,
942        F: AsyncFnMut(
943                Req,
944                JrRequestCx<Req::Response>,
945                JrConnectionCx<H::Link>,
946            ) -> Result<T, crate::Error>
947            + Send,
948        T: IntoHandled<(Req, JrRequestCx<Req::Response>)>,
949        ToFut: Fn(
950                &mut F,
951                Req,
952                JrRequestCx<Req::Response>,
953                JrConnectionCx<H::Link>,
954            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
955            + Send
956            + Sync,
957    {
958        self.with_handler(RequestHandler::new(
959            peer,
960            <H::Link>::default(),
961            op,
962            to_future_hack,
963        ))
964    }
965
966    /// Register a handler for JSON-RPC notifications from a specific peer.
967    ///
968    /// This is similar to [`on_receive_notification`](Self::on_receive_notification), but allows
969    /// specifying the source peer explicitly. This is useful when receiving messages
970    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorNotification`
971    /// envelopes when receiving from an agent via a proxy).
972    ///
973    /// For the common case of receiving from the default counterpart, use
974    /// [`on_receive_notification`](Self::on_receive_notification) instead.
975    ///
976    /// # Ordering
977    ///
978    /// This callback runs inside the dispatch loop and blocks further message processing
979    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
980    /// ordering guarantees and how to avoid deadlocks.
981    pub fn on_receive_notification_from<Notif: JrNotification, Peer: JrPeer, F, T, ToFut>(
982        self,
983        peer: Peer,
984        op: F,
985        to_future_hack: ToFut,
986    ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
987    where
988        H::Link: HasPeer<Peer>,
989        F: AsyncFnMut(Notif, JrConnectionCx<H::Link>) -> Result<T, crate::Error> + Send,
990        T: IntoHandled<(Notif, JrConnectionCx<H::Link>)>,
991        ToFut: Fn(
992                &mut F,
993                Notif,
994                JrConnectionCx<H::Link>,
995            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
996            + Send
997            + Sync,
998    {
999        self.with_handler(NotificationHandler::new(
1000            peer,
1001            <H::Link>::default(),
1002            op,
1003            to_future_hack,
1004        ))
1005    }
1006
1007    /// In a proxy, add this MCP server to new sessions passing through the proxy.
1008    ///
1009    /// This adds a handler that intercepts `session/new` requests to include the
1010    /// registered MCP server.
1011    ///
1012    /// # Example
1013    ///
1014    /// ```ignore
1015    /// use sacp::mcp_server::McpServiceRegistry;
1016    /// use sacp::ProxyToConductor;
1017    ///
1018    /// ProxyToConductor::builder()
1019    ///     .name("my-proxy")
1020    ///     .provide_mcp(McpServiceRegistry::default().with_mcp_server("example", my_server)?)
1021    ///     .serve(connection)
1022    ///     .await?;
1023    /// ```
1024    pub fn with_mcp_server<Link: JrLink, McpR: JrResponder<Link>>(
1025        self,
1026        server: McpServer<Link, McpR>,
1027    ) -> JrConnectionBuilder<impl JrMessageHandler<Link = Link>, impl JrResponder<Link>>
1028    where
1029        H: JrMessageHandler<Link = Link>,
1030        Link: HasPeer<ClientPeer> + HasPeer<AgentPeer>,
1031    {
1032        let (message_handler, mcp_responder) = server.into_handler_and_responder();
1033        self.with_handler(message_handler)
1034            .with_responder(mcp_responder)
1035    }
1036
1037    /// Connect these handlers to a transport layer.
1038    /// The resulting connection must then be either [served](`JrConnection::serve`) or [run until completion](`JrConnection::run_until`).
1039    pub fn connect_to(
1040        self,
1041        transport: impl Component<<H::Link as JrLink>::ConnectsTo> + 'static,
1042    ) -> Result<JrConnection<H, R>, crate::Error> {
1043        let Self {
1044            name,
1045            handler,
1046            responder,
1047        } = self;
1048
1049        let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
1050        let (new_task_tx, new_task_rx) = mpsc::unbounded();
1051        let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
1052        let cx = JrConnectionCx::new(outgoing_tx, new_task_tx, dynamic_handler_tx);
1053
1054        // Convert transport into server - this returns a channel for us to use
1055        // and a future that runs the transport
1056        let transport_component = crate::DynComponent::new(transport);
1057        let (transport_channel, transport_future) = transport_component.into_server();
1058        cx.spawn(transport_future)?;
1059
1060        // Destructure the channel endpoints
1061        let Channel {
1062            rx: transport_incoming_rx,
1063            tx: transport_outgoing_tx,
1064        } = transport_channel;
1065
1066        Ok(JrConnection {
1067            cx,
1068            name,
1069            outgoing_rx,
1070            new_task_rx,
1071            transport_outgoing_tx,
1072            transport_incoming_rx,
1073            dynamic_handler_rx,
1074            handler,
1075            responder,
1076        })
1077    }
1078
1079    /// Apply the registered handlers to a single message.
1080    ///
1081    /// This method processes one message through all registered handlers, attempting to
1082    /// match it against each handler in order. This is useful when implementing
1083    /// custom message handling logic or when you need fine-grained control over message
1084    /// processing.
1085    ///
1086    /// # Returns
1087    ///
1088    /// - `Ok(Handled::Claimed)` - A handler claimed and processed the message
1089    /// - `Ok(Handled::Unclaimed(message))` - No handler matched the message
1090    /// - `Err(_)` - A handler encountered an error while processing
1091    ///
1092    /// # Borrow Checker Considerations
1093    ///
1094    /// You may find that [`MatchMessage`](`crate::util::MatchMessage`) is a better choice than this method
1095    /// for implementing custom handlers. It offers a very similar API to
1096    /// [`JrConnectionBuilder`] but is structured to apply each test one at a time
1097    /// (sequentially) instead of setting them all up at once. This sequential approach
1098    /// often interacts better with the borrow checker, at the cost of requiring `.await`
1099    /// calls between each handler and only working for processing a single message.
1100    ///
1101    /// # Example: Borrow Checker Challenges
1102    ///
1103    /// When building a connection with `async {}` blocks (non-move), you might encounter
1104    /// borrow checker errors if multiple handlers need access to the same mutable state:
1105    ///
1106    /// ```compile_fail
1107    /// # use sacp::{JrConnectionBuilder, JrRequestCx};
1108    /// # use sacp::schema::{InitializeRequest, InitializeResponse};
1109    /// # use sacp::schema::{PromptRequest, PromptResponse};
1110    /// # async fn example() -> Result<(), sacp::Error> {
1111    /// let mut state = String::from("initial");
1112    ///
1113    /// // This fails to compile because both handlers borrow `state` mutably,
1114    /// // and the futures are set up at the same time (even though only one will run)
1115    /// let chain = UntypedLink::builder()
1116    ///     .on_receive_request(async |req: InitializeRequest, cx: JrRequestCx| {
1117    ///         state.push_str(" - initialized");  // First mutable borrow
1118    ///         cx.respond(InitializeResponse::make())
1119    ///     }, sacp::on_receive_request!())
1120    ///     .on_receive_request(async |req: PromptRequest, cx: JrRequestCx| {
1121    ///         state.push_str(" - prompted");  // Second mutable borrow - ERROR!
1122    ///         cx.respond(PromptResponse { content: vec![], stopReason: None })
1123    ///     }, sacp::on_receive_request!());
1124    /// # Ok(())
1125    /// # }
1126    /// ```
1127    ///
1128    /// You can work around this by using `apply()` to process messages one at a time,
1129    /// or use [`MatchMessage`](`crate::util::MatchMessage`) which provides a similar API but applies handlers sequentially:
1130    ///
1131    /// ```ignore
1132    /// use sacp::{MessageCx, Handled};
1133    /// use sacp::util::MatchMessage;
1134    ///
1135    /// async fn handle_with_state(
1136    ///     message: MessageCx,
1137    ///     state: &mut String,
1138    /// ) -> Result<Handled<MessageCx>, sacp::Error> {
1139    ///     MatchMessage::new(message)
1140    ///         .on_request(async |req: InitializeRequest, request_cx| {
1141    ///             state.push_str(" - initialized");  // Sequential - OK!
1142    ///             request_cx.respond(InitializeResponse::make())
1143    ///         })
1144    ///         .on_request(async |req: PromptRequest, request_cx| {
1145    ///             state.push_str(" - prompted");  // Sequential - OK!
1146    ///             request_cx.respond(PromptResponse { content: vec![], stopReason: None })
1147    ///         })
1148    ///         .otherwise(async |msg| Ok(Handled::Unclaimed(msg)))
1149    ///         .await
1150    /// }
1151    /// ```
1152    pub async fn apply(
1153        &mut self,
1154        message: MessageCx,
1155        cx: JrConnectionCx<H::Link>,
1156    ) -> Result<Handled<MessageCx>, crate::Error> {
1157        self.handler.handle_message(message, cx).await
1158    }
1159
1160    /// Convenience method to connect to a transport and serve.
1161    ///
1162    /// This is equivalent to:
1163    /// ```text
1164    /// handler_chain.connect_to(transport)?.serve().await
1165    /// ```
1166    pub async fn serve(
1167        self,
1168        transport: impl Component<<H::Link as JrLink>::ConnectsTo> + 'static,
1169    ) -> Result<(), crate::Error> {
1170        self.connect_to(transport)?.serve().await
1171    }
1172
1173    /// Convenience method to connect to a transport and run until a closure completes.
1174    ///
1175    /// This is equivalent to:
1176    /// ```text
1177    /// handler_chain.connect_to(transport)?.run_until(main_fn).await
1178    /// ```
1179    pub async fn run_until(
1180        self,
1181        transport: impl Component<<H::Link as JrLink>::ConnectsTo> + 'static,
1182        main_fn: impl AsyncFnOnce(JrConnectionCx<H::Link>) -> Result<(), crate::Error>,
1183    ) -> Result<(), crate::Error> {
1184        self.connect_to(transport)?.run_until(main_fn).await
1185    }
1186}
1187
1188impl<H, R> Component<H::Link> for JrConnectionBuilder<H, R>
1189where
1190    H: JrMessageHandler + 'static,
1191    R: JrResponder<H::Link> + 'static,
1192{
1193    async fn serve(
1194        self,
1195        client: impl Component<<H::Link as JrLink>::ConnectsTo>,
1196    ) -> Result<(), crate::Error> {
1197        self.connect_to(client)?.serve().await
1198    }
1199}
1200
1201/// A JSON-RPC connection with an active transport.
1202///
1203/// This type represents a `JrConnectionBuilder` that has been connected to a transport
1204/// via `connect_to()`. It can be driven in two modes:
1205///
1206/// - [`serve()`](Self::serve) - Run as a server, handling incoming messages until the connection closes
1207/// - [`run_until()`](Self::run_until) - Run until a closure completes, allowing you to send requests/notifications
1208///
1209/// Most users won't construct this directly - instead use `JrConnectionBuilder::connect_to()` or
1210/// `JrConnectionBuilder::serve()` for convenience.
1211pub struct JrConnection<H: JrMessageHandler, R: JrResponder<H::Link> = NullResponder> {
1212    cx: JrConnectionCx<H::Link>,
1213    name: Option<String>,
1214    outgoing_rx: mpsc::UnboundedReceiver<OutgoingMessage>,
1215    new_task_rx: mpsc::UnboundedReceiver<Task>,
1216    transport_outgoing_tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
1217    transport_incoming_rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
1218    dynamic_handler_rx: mpsc::UnboundedReceiver<DynamicHandlerMessage<H::Link>>,
1219    handler: H,
1220    responder: R,
1221}
1222
1223impl<H: JrMessageHandler, R: JrResponder<H::Link>> JrConnection<H, R> {
1224    /// Run the connection in server mode with the provided transport.
1225    ///
1226    /// This drives the connection by continuously processing messages from the transport
1227    /// and dispatching them to your registered handlers. The connection will run until:
1228    /// - The transport closes (e.g., EOF on byte streams)
1229    /// - An error occurs
1230    /// - One of your handlers returns an error
1231    ///
1232    /// The transport is responsible for serializing and deserializing `jsonrpcmsg::Message`
1233    /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
1234    ///
1235    /// Use this mode when you only need to respond to incoming messages and don't need
1236    /// to initiate your own requests. If you need to send requests to the other side,
1237    /// use [`run_until`](Self::run_until) instead.
1238    ///
1239    /// # Example: Byte Stream Transport
1240    ///
1241    /// ```no_run
1242    /// # use sacp::link::UntypedLink;
1243    /// # use sacp::{JrConnectionBuilder};
1244    /// # use sacp::ByteStreams;
1245    /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
1246    /// # use sacp_test::*;
1247    /// # async fn example() -> Result<(), sacp::Error> {
1248    /// let transport = ByteStreams::new(
1249    ///     tokio::io::stdout().compat_write(),
1250    ///     tokio::io::stdin().compat(),
1251    /// );
1252    ///
1253    /// UntypedLink::builder()
1254    ///     .on_receive_request(async |req: MyRequest, request_cx, cx| {
1255    ///         request_cx.respond(MyResponse { status: "ok".into() })
1256    ///     }, sacp::on_receive_request!())
1257    ///     .serve(transport)
1258    ///     .await?;
1259    /// # Ok(())
1260    /// # }
1261    /// ```
1262    pub async fn serve(self) -> Result<(), crate::Error> {
1263        self.run_until(async move |_cx| future::pending().await)
1264            .await
1265    }
1266
1267    /// Run the connection until the provided closure completes.
1268    ///
1269    /// This drives the connection by:
1270    /// 1. Running your registered handlers in the background to process incoming messages
1271    /// 2. Executing your `main_fn` closure with a [`JrConnectionCx<R>`] for sending requests/notifications
1272    ///
1273    /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
1274    /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
1275    ///
1276    /// Use this mode when you need to initiate communication (send requests/notifications)
1277    /// in addition to responding to incoming messages. For server-only mode where you just
1278    /// respond to messages, use [`serve`](Self::serve) instead.
1279    ///
1280    /// # Example
1281    ///
1282    /// ```no_run
1283    /// # use sacp::link::UntypedLink;
1284    /// # use sacp::{JrConnectionBuilder};
1285    /// # use sacp::ByteStreams;
1286    /// # use sacp::schema::InitializeRequest;
1287    /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
1288    /// # use sacp_test::*;
1289    /// # async fn example() -> Result<(), sacp::Error> {
1290    /// let transport = ByteStreams::new(
1291    ///     tokio::io::stdout().compat_write(),
1292    ///     tokio::io::stdin().compat(),
1293    /// );
1294    ///
1295    /// UntypedLink::builder()
1296    ///     .on_receive_request(async |req: MyRequest, request_cx, cx| {
1297    ///         // Handle incoming requests in the background
1298    ///         request_cx.respond(MyResponse { status: "ok".into() })
1299    ///     }, sacp::on_receive_request!())
1300    ///     .connect_to(transport)?
1301    ///     .run_until(async |cx| {
1302    ///         // Initialize the protocol
1303    ///         let init_response = cx.send_request(InitializeRequest::make())
1304    ///             .block_task()
1305    ///             .await?;
1306    ///
1307    ///         // Send more requests...
1308    ///         let result = cx.send_request(MyRequest {})
1309    ///             .block_task()
1310    ///             .await?;
1311    ///
1312    ///         // When this closure returns, the connection shuts down
1313    ///         Ok(())
1314    ///     })
1315    ///     .await?;
1316    /// # Ok(())
1317    /// # }
1318    /// ```
1319    ///
1320    /// # Parameters
1321    ///
1322    /// - `main_fn`: Your client logic. Receives a [`JrConnectionCx<R>`] for sending messages.
1323    ///
1324    /// # Errors
1325    ///
1326    /// Returns an error if the connection closes before `main_fn` completes.
1327    pub async fn run_until<T>(
1328        self,
1329        main_fn: impl AsyncFnOnce(JrConnectionCx<H::Link>) -> Result<T, crate::Error>,
1330    ) -> Result<T, crate::Error> {
1331        let JrConnection {
1332            cx,
1333            name,
1334            outgoing_rx,
1335            new_task_rx,
1336            handler,
1337            responder,
1338            transport_outgoing_tx,
1339            transport_incoming_rx,
1340            dynamic_handler_rx,
1341        } = self;
1342        let (reply_tx, reply_rx) = mpsc::unbounded();
1343
1344        crate::util::instrument_with_connection_name(name, async move {
1345            let background = async {
1346                futures::try_join!(
1347                    // Protocol layer: OutgoingMessage → jsonrpcmsg::Message
1348                    outgoing_actor::outgoing_protocol_actor(
1349                        outgoing_rx,
1350                        reply_tx.clone(),
1351                        transport_outgoing_tx,
1352                    ),
1353                    // Protocol layer: jsonrpcmsg::Message → handler/reply routing
1354                    incoming_actor::incoming_protocol_actor(
1355                        &cx,
1356                        transport_incoming_rx,
1357                        dynamic_handler_rx,
1358                        reply_rx,
1359                        handler,
1360                    ),
1361                    task_actor::task_actor(new_task_rx, &cx),
1362                    responder.run(cx.clone()),
1363                )?;
1364                Ok(())
1365            };
1366
1367            crate::util::run_until(background, main_fn(cx.clone())).await
1368        })
1369        .await
1370    }
1371}
1372
1373/// The payload sent through the response oneshot channel.
1374///
1375/// Includes the response value and an optional ack channel for dispatch loop
1376/// synchronization.
1377pub(crate) struct ResponsePayload {
1378    /// The response result - either the JSON value or an error.
1379    pub(crate) result: Result<serde_json::Value, crate::Error>,
1380
1381    /// Optional acknowledgment channel for dispatch loop synchronization.
1382    ///
1383    /// When present, the receiver must send on this channel to signal that
1384    /// response processing is complete, allowing the dispatch loop to continue
1385    /// to the next message.
1386    ///
1387    /// This is `None` for error paths where the response is sent directly
1388    /// (e.g., when the outgoing channel is broken) rather than through the
1389    /// normal dispatch loop flow.
1390    pub(crate) ack_tx: Option<oneshot::Sender<()>>,
1391}
1392
1393impl std::fmt::Debug for ResponsePayload {
1394    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1395        f.debug_struct("ResponsePayload")
1396            .field("result", &self.result)
1397            .field("ack_tx", &self.ack_tx.as_ref().map(|_| "..."))
1398            .finish()
1399    }
1400}
1401
1402/// Message sent to the incoming actor for reply subscription management.
1403enum ReplyMessage {
1404    /// Subscribe to receive a response for the given request id.
1405    /// When a response with this id arrives, it will be sent through the oneshot
1406    /// along with an ack channel that must be signaled when processing is complete.
1407    Subscribe(jsonrpcmsg::Id, oneshot::Sender<ResponsePayload>),
1408}
1409
1410impl std::fmt::Debug for ReplyMessage {
1411    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1412        match self {
1413            ReplyMessage::Subscribe(id, _) => f.debug_tuple("Subscribe").field(id).finish(),
1414        }
1415    }
1416}
1417
1418/// Messages send to be serialized over the transport.
1419#[derive(Debug)]
1420enum OutgoingMessage {
1421    /// Send a request to the server.
1422    Request {
1423        /// method to use in the request
1424        method: String,
1425
1426        /// parameters for the request
1427        params: Option<jsonrpcmsg::Params>,
1428
1429        /// where to send the response when it arrives (includes ack channel)
1430        response_tx: oneshot::Sender<ResponsePayload>,
1431    },
1432
1433    /// Send a notification to the server.
1434    Notification {
1435        /// method to use in the request
1436        method: String,
1437
1438        /// parameters for the request
1439        params: Option<jsonrpcmsg::Params>,
1440    },
1441
1442    /// Send a reponse to a message from the server
1443    Response {
1444        id: jsonrpcmsg::Id,
1445
1446        response: Result<serde_json::Value, crate::Error>,
1447    },
1448
1449    /// Send a generalized error message
1450    Error { error: crate::Error },
1451}
1452
1453/// Return type from JrHandler; indicates whether the request was handled or not.
1454#[must_use]
1455pub enum Handled<T> {
1456    /// The message was handled
1457    Yes,
1458
1459    /// The message was not handled; returns the original value.
1460    ///
1461    /// If `retry` is true,
1462    No {
1463        /// The message to be passed to subsequent handlers
1464        /// (typically the original message, but it may have been
1465        /// mutated.)
1466        message: T,
1467
1468        /// If true, request the message to be queued and retried with
1469        /// dynamic handlers as they are added.
1470        ///
1471        /// This is used for managing session updates since the dynamic
1472        /// handler for a session cannot be added until the response to the
1473        /// new session request has been processed and there may be updates
1474        /// that get processed at the same time.
1475        retry: bool,
1476    },
1477}
1478
1479/// Trait for converting handler return values into [`Handled`].
1480///
1481/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
1482/// or an explicit `Handled<T>` value for more control over handler propagation.
1483pub trait IntoHandled<T> {
1484    /// Convert this value into a `Handled<T>`.
1485    fn into_handled(self) -> Handled<T>;
1486}
1487
1488impl<T> IntoHandled<T> for () {
1489    fn into_handled(self) -> Handled<T> {
1490        Handled::Yes
1491    }
1492}
1493
1494impl<T> IntoHandled<T> for Handled<T> {
1495    fn into_handled(self) -> Handled<T> {
1496        self
1497    }
1498}
1499
1500/// Trait for types that can provide transport for JSON-RPC messages.
1501///
1502/// Implementations of this trait bridge between the internal protocol channels
1503/// (which carry `jsonrpcmsg::Message`) and the actual I/O mechanism (byte streams,
1504/// in-process channels, network sockets, etc.).
1505///
1506/// The transport layer is responsible only for moving `jsonrpcmsg::Message` in and out.
1507/// It has no knowledge of protocol semantics like request/response correlation, ID assignment,
1508/// or handler dispatch - those are handled by the protocol layer in `JrConnection`.
1509///
1510/// # Example
1511///
1512/// See [`ByteStreams`] for the standard byte stream implementation.
1513
1514/// Connection context for sending messages and spawning tasks.
1515///
1516/// This is the primary handle for interacting with the JSON-RPC connection from
1517/// within handler callbacks. You can use it to:
1518///
1519/// * Send requests and notifications to the other side
1520/// * Spawn concurrent tasks that run alongside the connection
1521/// * Respond to requests (via [`JrRequestCx`] which wraps this)
1522///
1523/// # Cloning
1524///
1525/// `JrConnectionCx` is cheaply cloneable - all clones refer to the same underlying connection.
1526/// This makes it easy to share across async tasks.
1527///
1528/// # Event Loop and Concurrency
1529///
1530/// Handler callbacks run on the event loop, which means the connection cannot process new
1531/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
1532/// expensive or blocking work to concurrent tasks.
1533///
1534/// See the [Event Loop and Concurrency](JrConnection#event-loop-and-concurrency) section
1535/// for more details.
1536#[derive(Clone, Debug)]
1537pub struct JrConnectionCx<Link> {
1538    #[expect(dead_code)]
1539    role: Link,
1540    message_tx: OutgoingMessageTx,
1541    task_tx: TaskTx,
1542    dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Link>>,
1543}
1544
1545impl<Link: JrLink> JrConnectionCx<Link> {
1546    fn new(
1547        message_tx: mpsc::UnboundedSender<OutgoingMessage>,
1548        task_tx: mpsc::UnboundedSender<Task>,
1549        dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Link>>,
1550    ) -> Self {
1551        Self {
1552            role: Link::default(),
1553            message_tx,
1554            task_tx,
1555            dynamic_handler_tx,
1556        }
1557    }
1558
1559    /// Spawns a task that will run so long as the JSON-RPC connection is being served.
1560    ///
1561    /// This is the primary mechanism for offloading expensive work from handler callbacks
1562    /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
1563    /// allowing the server to continue processing messages.
1564    ///
1565    /// # Event Loop
1566    ///
1567    /// Handler callbacks run on the event loop, which cannot process new messages while
1568    /// your handler is running. Use `spawn` for any expensive operations:
1569    ///
1570    /// ```no_run
1571    /// # use sacp_test::*;
1572    /// # async fn example() -> Result<(), sacp::Error> {
1573    /// # let connection = mock_connection();
1574    /// connection.on_receive_request(async |req: ProcessRequest, request_cx, cx| {
1575    ///     // Clone cx for the spawned task
1576    ///     cx.spawn({
1577    ///         let connection_cx = cx.clone();
1578    ///         async move {
1579    ///             let result = expensive_operation(&req.data).await?;
1580    ///             connection_cx.send_notification(ProcessComplete { result })?;
1581    ///             Ok(())
1582    ///         }
1583    ///     })?;
1584    ///
1585    ///     // Respond immediately
1586    ///     request_cx.respond(ProcessResponse { result: "started".into() })
1587    /// }, sacp::on_receive_request!())
1588    /// # .serve(sacp_test::MockTransport).await?;
1589    /// # Ok(())
1590    /// # }
1591    /// ```
1592    ///
1593    /// # Errors
1594    ///
1595    /// If the spawned task returns an error, the entire server will shut down.
1596    #[track_caller]
1597    pub fn spawn(
1598        &self,
1599        task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
1600    ) -> Result<(), crate::Error> {
1601        let location = std::panic::Location::caller();
1602        let task = task.into_future();
1603        Task::new(location, task).spawn(&self.task_tx)
1604    }
1605
1606    /// Spawn a JSON-RPC connection in the background and return a [`JrConnectionCx`] for sending messages to it.
1607    ///
1608    /// This is useful for creating multiple connections that communicate with each other,
1609    /// such as implementing proxy patterns or connecting to multiple backend services.
1610    ///
1611    /// # Arguments
1612    ///
1613    /// - `connection`: The `JrConnection` to spawn (typically created via `JrConnectionBuilder::connect_to()`)
1614    /// - `serve_future`: A function that drives the connection (usually `|c| Box::pin(c.serve())`)
1615    ///
1616    /// # Returns
1617    ///
1618    /// A `JrConnectionCx` that you can use to send requests and notifications to the spawned connection.
1619    ///
1620    /// # Example: Proxying to a backend connection
1621    ///
1622    /// ```
1623    /// # use sacp::link::UntypedLink;
1624    /// # use sacp::{JrConnectionBuilder, JrConnectionCx};
1625    /// # use sacp_test::*;
1626    /// # async fn example(cx: JrConnectionCx<UntypedLink>) -> Result<(), sacp::Error> {
1627    /// // Set up a backend connection
1628    /// let backend = UntypedLink::builder()
1629    ///     .on_receive_request(async |req: MyRequest, request_cx, _cx| {
1630    ///         request_cx.respond(MyResponse { status: "ok".into() })
1631    ///     }, sacp::on_receive_request!())
1632    ///     .connect_to(MockTransport)?;
1633    ///
1634    /// // Spawn it and get a context to send requests to it
1635    /// let backend_cx = cx.spawn_connection(backend, |c| Box::pin(c.serve()))?;
1636    ///
1637    /// // Now you can forward requests to the backend
1638    /// let response = backend_cx.send_request(MyRequest {}).block_task().await?;
1639    /// # Ok(())
1640    /// # }
1641    /// ```
1642    #[track_caller]
1643    pub fn spawn_connection<H: JrMessageHandler, R: JrResponder<H::Link> + 'static>(
1644        &self,
1645        connection: JrConnection<H, R>,
1646        serve_future: impl FnOnce(JrConnection<H, R>) -> BoxFuture<'static, Result<(), crate::Error>>,
1647    ) -> Result<JrConnectionCx<H::Link>, crate::Error> {
1648        let cx = connection.cx.clone();
1649        let future = serve_future(connection);
1650        Task::new(std::panic::Location::caller(), future).spawn(&self.task_tx)?;
1651        Ok(cx)
1652    }
1653
1654    /// Send a request/notification and forward the response appropriately.
1655    ///
1656    /// The request context's response type matches the request's response type,
1657    /// enabling type-safe message forwarding.
1658    pub fn send_proxied_message_to<
1659        Peer: JrPeer,
1660        Req: JrRequest<Response: Send>,
1661        Notif: JrNotification,
1662    >(
1663        &self,
1664        peer: Peer,
1665        message: MessageCx<Req, Notif>,
1666    ) -> Result<(), crate::Error>
1667    where
1668        Link: HasPeer<Peer>,
1669    {
1670        match message {
1671            MessageCx::Request(request, request_cx) => self
1672                .send_request_to(peer, request)
1673                .forward_to_request_cx(request_cx),
1674            MessageCx::Notification(notification) => self.send_notification_to(peer, notification),
1675        }
1676    }
1677
1678    /// Send an outgoing request and return a [`JrResponse`] for handling the reply.
1679    ///
1680    /// The returned [`JrResponse`] provides methods for receiving the response without
1681    /// blocking the event loop:
1682    ///
1683    /// * [`on_receiving_result`](JrResponse::on_receiving_result) - Schedule
1684    ///   a callback to run when the response arrives (doesn't block the event loop)
1685    /// * [`block_task`](JrResponse::block_task) - Block the current task until the response
1686    ///   arrives (only safe in spawned tasks, not in handlers)
1687    ///
1688    /// # Anti-Footgun Design
1689    ///
1690    /// The API intentionally makes it difficult to block on the result directly to prevent
1691    /// the common mistake of blocking the event loop while waiting for a response:
1692    ///
1693    /// ```compile_fail
1694    /// # use sacp_test::*;
1695    /// # async fn example(cx: sacp::JrConnectionCx<sacp::link::UntypedLink>) -> Result<(), sacp::Error> {
1696    /// // ❌ This doesn't compile - prevents blocking the event loop
1697    /// let response = cx.send_request(MyRequest {}).await?;
1698    /// # Ok(())
1699    /// # }
1700    /// ```
1701    ///
1702    /// ```no_run
1703    /// # use sacp_test::*;
1704    /// # async fn example(cx: sacp::JrConnectionCx<sacp::link::UntypedLink>) -> Result<(), sacp::Error> {
1705    /// // ✅ Option 1: Schedule callback (safe in handlers)
1706    /// cx.send_request(MyRequest {})
1707    ///     .on_receiving_result(async |result| {
1708    ///         // Handle the response
1709    ///         Ok(())
1710    ///     })?;
1711    ///
1712    /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
1713    /// cx.spawn({
1714    ///     let cx = cx.clone();
1715    ///     async move {
1716    ///         let response = cx.send_request(MyRequest {})
1717    ///             .block_task()
1718    ///             .await?;
1719    ///         // Process response...
1720    ///         Ok(())
1721    ///     }
1722    /// })?;
1723    /// # Ok(())
1724    /// # }
1725    /// ```
1726    /// Send an outgoing request to the default counterpart peer.
1727    ///
1728    /// This is a convenience method that uses the connection's default peer.
1729    /// For explicit control over the target peer, use [`send_request_to`](Self::send_request_to).
1730    pub fn send_request<Req: JrRequest>(&self, request: Req) -> JrResponse<Req::Response>
1731    where
1732        Link: HasDefaultPeer,
1733    {
1734        self.send_request_to(Link::DefaultPeer::default(), request)
1735    }
1736
1737    /// Send an outgoing request to a specific counterpart peer.
1738    ///
1739    /// The message will be transformed according to the link's [`HasPeer`](crate::HasPeer)
1740    /// implementation before being sent.
1741    pub fn send_request_to<Peer: JrPeer, Req: JrRequest>(
1742        &self,
1743        peer: Peer,
1744        request: Req,
1745    ) -> JrResponse<Req::Response>
1746    where
1747        Link: HasPeer<Peer>,
1748    {
1749        let method = request.method().to_string();
1750        let (response_tx, response_rx) = oneshot::channel();
1751        match Link::remote_style(peer).transform_outgoing_message(request) {
1752            Ok(untyped) => {
1753                // Transform the message for the target role
1754                let params = crate::util::json_cast(untyped.params).ok();
1755                let message = OutgoingMessage::Request {
1756                    method: untyped.method.clone(),
1757                    params,
1758                    response_tx,
1759                };
1760
1761                match self.message_tx.unbounded_send(message) {
1762                    Ok(()) => (),
1763                    Err(error) => {
1764                        let OutgoingMessage::Request {
1765                            method,
1766                            response_tx,
1767                            ..
1768                        } = error.into_inner()
1769                        else {
1770                            unreachable!();
1771                        };
1772
1773                        response_tx
1774                            .send(ResponsePayload {
1775                                result: Err(crate::util::internal_error(format!(
1776                                    "failed to send outgoing request `{method}"
1777                                ))),
1778                                ack_tx: None,
1779                            })
1780                            .unwrap();
1781                    }
1782                }
1783            }
1784
1785            Err(err) => {
1786                response_tx
1787                    .send(ResponsePayload {
1788                        result: Err(crate::util::internal_error(format!(
1789                            "failed to create untyped request for `{method}`: {err}"
1790                        ))),
1791                        ack_tx: None,
1792                    })
1793                    .unwrap();
1794            }
1795        }
1796
1797        JrResponse::new(method.clone(), self.task_tx.clone(), response_rx)
1798            .map(move |json| <Req::Response>::from_value(&method, json))
1799    }
1800
1801    /// Send an outgoing notification to the default counterpart peer (no reply expected).
1802    ///
1803    /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
1804    /// This method sends the notification immediately and returns.
1805    ///
1806    /// This is a convenience method that uses the link's default peer.
1807    /// For explicit control over the target peer, use [`send_notification_to`](Self::send_notification_to).
1808    ///
1809    /// ```no_run
1810    /// # use sacp_test::*;
1811    /// # async fn example(cx: sacp::JrConnectionCx<sacp::link::UntypedLink>) -> Result<(), sacp::Error> {
1812    /// cx.send_notification(StatusUpdate {
1813    ///     message: "Processing...".into(),
1814    /// })?;
1815    /// # Ok(())
1816    /// # }
1817    /// ```
1818    pub fn send_notification<N: JrNotification>(&self, notification: N) -> Result<(), crate::Error>
1819    where
1820        Link: HasDefaultPeer,
1821    {
1822        self.send_notification_to(Link::DefaultPeer::default(), notification)
1823    }
1824
1825    /// Send an outgoing notification to a specific counterpart peer (no reply expected).
1826    ///
1827    /// The message will be transformed according to the link's [`HasPeer`](crate::HasPeer)
1828    /// implementation before being sent.
1829    pub fn send_notification_to<Peer: JrPeer, N: JrNotification>(
1830        &self,
1831        peer: Peer,
1832        notification: N,
1833    ) -> Result<(), crate::Error>
1834    where
1835        Link: HasPeer<Peer>,
1836    {
1837        let remote_style = Link::remote_style(peer);
1838        tracing::debug!(
1839            role = std::any::type_name::<Link>(),
1840            peer = std::any::type_name::<Peer>(),
1841            notification_type = std::any::type_name::<N>(),
1842            ?remote_style,
1843            original_method = notification.method(),
1844            "send_notification_to"
1845        );
1846        let transformed = remote_style.transform_outgoing_message(notification)?;
1847        tracing::debug!(
1848            transformed_method = %transformed.method,
1849            "send_notification_to transformed"
1850        );
1851        let params = crate::util::json_cast(transformed.params).ok();
1852        send_raw_message(
1853            &self.message_tx,
1854            OutgoingMessage::Notification {
1855                method: transformed.method,
1856                params,
1857            },
1858        )
1859    }
1860
1861    /// Send an error notification (no reply expected).
1862    pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
1863        send_raw_message(&self.message_tx, OutgoingMessage::Error { error })
1864    }
1865
1866    /// Register a dynamic message handler, used to intercept messages specific to a particular session
1867    /// or some similar modal thing.
1868    ///
1869    /// Dynamic message handlers are called first for every incoming message.
1870    ///
1871    /// If they decline to handle the message, then the message is passed to the regular registered handlers.
1872    ///
1873    /// The handler will stay registered until the returned registration guard is dropped.
1874    pub fn add_dynamic_handler(
1875        &self,
1876        handler: impl JrMessageHandler<Link = Link> + 'static,
1877    ) -> Result<DynamicHandlerRegistration<Link>, crate::Error> {
1878        let uuid = Uuid::new_v4();
1879        self.dynamic_handler_tx
1880            .unbounded_send(DynamicHandlerMessage::AddDynamicHandler(
1881                uuid.clone(),
1882                Box::new(handler),
1883            ))
1884            .map_err(crate::util::internal_error)?;
1885
1886        Ok(DynamicHandlerRegistration::new(uuid, self.clone()))
1887    }
1888
1889    fn remove_dynamic_handler(&self, uuid: Uuid) {
1890        match self
1891            .dynamic_handler_tx
1892            .unbounded_send(DynamicHandlerMessage::RemoveDynamicHandler(uuid))
1893        {
1894            Ok(_) => (),
1895            Err(_) => ( /* ignore errors */),
1896        }
1897    }
1898}
1899
1900#[derive(Clone, Debug)]
1901pub struct DynamicHandlerRegistration<Link: JrLink> {
1902    uuid: Uuid,
1903    cx: JrConnectionCx<Link>,
1904}
1905
1906impl<Link: JrLink> DynamicHandlerRegistration<Link> {
1907    fn new(uuid: Uuid, cx: JrConnectionCx<Link>) -> Self {
1908        Self { uuid, cx }
1909    }
1910
1911    /// Prevents the dynamic handler from being removed when dropped.
1912    pub fn run_indefinitely(self) {
1913        std::mem::forget(self)
1914    }
1915}
1916
1917impl<Link: JrLink> Drop for DynamicHandlerRegistration<Link> {
1918    fn drop(&mut self) {
1919        self.cx.remove_dynamic_handler(self.uuid.clone());
1920    }
1921}
1922
1923/// The context to respond to an incoming request.
1924///
1925/// This context is provided to request handlers and serves a dual role:
1926///
1927/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
1928///    [`respond_with_result`](Self::respond_with_result) to send the response
1929/// 2. **Send other messages** - Use the [`JrConnectionCx`] parameter passed to your
1930///    handler, which provides [`send_request`](`JrConnectionCx::send_request`),
1931///    [`send_notification`](`JrConnectionCx::send_notification`), and
1932///    [`spawn`](`JrConnectionCx::spawn`)
1933///
1934/// # Example
1935///
1936/// ```no_run
1937/// # use sacp_test::*;
1938/// # async fn example() -> Result<(), sacp::Error> {
1939/// # let connection = mock_connection();
1940/// connection.on_receive_request(async |req: ProcessRequest, request_cx, cx| {
1941///     // Send a notification while processing
1942///     cx.send_notification(StatusUpdate {
1943///         message: "processing".into(),
1944///     })?;
1945///
1946///     // Do some work...
1947///     let result = process(&req.data)?;
1948///
1949///     // Respond to the request
1950///     request_cx.respond(ProcessResponse { result })
1951/// }, sacp::on_receive_request!())
1952/// # .serve(sacp_test::MockTransport).await?;
1953/// # Ok(())
1954/// # }
1955/// ```
1956///
1957/// # Event Loop Considerations
1958///
1959/// Like all handlers, request handlers run on the event loop. Use
1960/// [`spawn`](JrConnectionCx::spawn) for expensive operations to avoid blocking
1961/// the connection.
1962///
1963/// See the [Event Loop and Concurrency](JrConnection#event-loop-and-concurrency)
1964/// section for more details.
1965#[must_use]
1966pub struct JrRequestCx<T: JrResponsePayload = serde_json::Value> {
1967    /// The context to use to send outgoing messages and replies.
1968    message_tx: OutgoingMessageTx,
1969    /// The method of the request.
1970    method: String,
1971
1972    /// The `id` of the message we are replying to.
1973    id: jsonrpcmsg::Id,
1974
1975    /// Function to send the response `T` to a request with the given method and id.
1976    make_json: SendBoxFnOnce<
1977        'static,
1978        (String, Result<T, crate::Error>),
1979        Result<serde_json::Value, crate::Error>,
1980    >,
1981}
1982
1983impl<T: JrResponsePayload> std::fmt::Debug for JrRequestCx<T> {
1984    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1985        f.debug_struct("JrRequestCx")
1986            .field("method", &self.method)
1987            .field("id", &self.id)
1988            .field("response_type", &std::any::type_name::<T>())
1989            .finish()
1990    }
1991}
1992
1993impl JrRequestCx<serde_json::Value> {
1994    /// Create a new method context.
1995    fn new(message_tx: OutgoingMessageTx, method: String, id: jsonrpcmsg::Id) -> Self {
1996        Self {
1997            message_tx,
1998            method,
1999            id,
2000            make_json: SendBoxFnOnce::new(move |_method, value| value),
2001        }
2002    }
2003
2004    /// Cast this request context to a different response type
2005    pub fn cast<T: JrResponsePayload>(self) -> JrRequestCx<T> {
2006        self.wrap_params(move |method, value| match value {
2007            Ok(value) => T::into_json(value, &method),
2008            Err(e) => Err(e),
2009        })
2010    }
2011}
2012
2013impl<T: JrResponsePayload> JrRequestCx<T> {
2014    /// Method of the incoming request
2015    pub fn method(&self) -> &str {
2016        &self.method
2017    }
2018
2019    /// ID of the incoming request as a JSON value
2020    pub fn id(&self) -> serde_json::Value {
2021        match &self.id {
2022            jsonrpcmsg::Id::Number(n) => serde_json::Value::Number((*n).into()),
2023            jsonrpcmsg::Id::String(s) => serde_json::Value::String(s.clone()),
2024            jsonrpcmsg::Id::Null => serde_json::Value::Null,
2025        }
2026    }
2027
2028    /// Convert to a `JrRequestCx` that expects a JSON value
2029    /// and which checks (dynamically) that the JSON value it receives
2030    /// can be converted to `T`.
2031    pub fn erase_to_json(self) -> JrRequestCx<serde_json::Value> {
2032        self.wrap_params(|method, value| T::from_value(&method, value?))
2033    }
2034
2035    /// Return a new JrResponse that expects a response of type U and serializes it.
2036    pub fn wrap_method(self, method: String) -> JrRequestCx<T> {
2037        JrRequestCx {
2038            message_tx: self.message_tx,
2039            method,
2040            id: self.id,
2041            make_json: self.make_json,
2042        }
2043    }
2044
2045    /// Return a new JrResponse that expects a response of type U and serializes it.
2046    ///
2047    /// `wrap_fn` will be invoked with the method name and the result of the wrapped function.
2048    pub fn wrap_params<U: JrResponsePayload>(
2049        self,
2050        wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2051    ) -> JrRequestCx<U> {
2052        JrRequestCx {
2053            message_tx: self.message_tx,
2054            method: self.method,
2055            id: self.id,
2056            make_json: SendBoxFnOnce::new(move |method: String, input: Result<U, crate::Error>| {
2057                let t_value = wrap_fn(&method, input);
2058                self.make_json.call(method, t_value)
2059            }),
2060        }
2061    }
2062
2063    /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
2064    pub fn respond_with_result(
2065        self,
2066        response: Result<T, crate::Error>,
2067    ) -> Result<(), crate::Error> {
2068        tracing::debug!(id = ?self.id, "respond called");
2069        let json = self.make_json.call_tuple((self.method.clone(), response));
2070        send_raw_message(
2071            &self.message_tx,
2072            OutgoingMessage::Response {
2073                id: self.id,
2074                response: json,
2075            },
2076        )
2077    }
2078
2079    /// Respond to the JSON-RPC request with a value.
2080    pub fn respond(self, response: T) -> Result<(), crate::Error> {
2081        self.respond_with_result(Ok(response))
2082    }
2083
2084    /// Respond to the JSON-RPC request with an internal error containing a message.
2085    pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2086        self.respond_with_error(crate::util::internal_error(message))
2087    }
2088
2089    /// Respond to the JSON-RPC request with an error.
2090    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2091        tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
2092        self.respond_with_result(Err(error))
2093    }
2094}
2095
2096/// Common bounds for any JSON-RPC message.
2097///
2098/// # Derive Macro
2099///
2100/// For simple message types, you can use the `JrRequest` or `JrNotification` derive macros
2101/// which will implement both `JrMessage` and the respective trait. See [`JrRequest`] and
2102/// [`JrNotification`] for examples.
2103pub trait JrMessage: 'static + Debug + Sized + Send + Clone {
2104    /// The method name for the message.
2105    fn method(&self) -> &str;
2106
2107    /// Convert this message into an untyped message.
2108    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
2109
2110    /// Attempt to parse this type from a method name and parameters.
2111    ///
2112    /// Returns:
2113    /// - `None` if this type does not recognize the method name
2114    /// - `Some(Ok(value))` if the method is recognized and deserialization succeeds
2115    /// - `Some(Err(error))` if the method is recognized but deserialization fails
2116    fn parse_message(method: &str, params: &impl Serialize) -> Option<Result<Self, crate::Error>>;
2117}
2118
2119/// Defines the "payload" of a successful response to a JSON-RPC request.
2120///
2121/// # Derive Macro
2122///
2123/// Use `#[derive(JrResponsePayload)]` to automatically implement this trait:
2124///
2125/// ```ignore
2126/// use sacp::JrResponsePayload;
2127/// use serde::{Serialize, Deserialize};
2128///
2129/// #[derive(Debug, Serialize, Deserialize, JrResponsePayload)]
2130/// struct HelloResponse {
2131///     greeting: String,
2132/// }
2133/// ```
2134pub trait JrResponsePayload: 'static + Debug + Sized + Send {
2135    /// Convert this message into a JSON value.
2136    fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
2137
2138    /// Parse a JSON value into the response type.
2139    fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
2140}
2141
2142impl JrResponsePayload for serde_json::Value {
2143    fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
2144        Ok(value)
2145    }
2146
2147    fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
2148        Ok(self)
2149    }
2150}
2151
2152/// A struct that represents a notification (JSON-RPC message that does not expect a response).
2153///
2154/// # Derive Macro
2155///
2156/// Use `#[derive(JrNotification)]` to automatically implement both `JrMessage` and `JrNotification`:
2157///
2158/// ```ignore
2159/// use sacp::JrNotification;
2160/// use serde::{Serialize, Deserialize};
2161///
2162/// #[derive(Debug, Clone, Serialize, Deserialize, JrNotification)]
2163/// #[notification(method = "_ping")]
2164/// struct PingNotification {
2165///     timestamp: u64,
2166/// }
2167/// ```
2168pub trait JrNotification: JrMessage {}
2169
2170/// A struct that represents a request (JSON-RPC message expecting a response).
2171///
2172/// # Derive Macro
2173///
2174/// Use `#[derive(JrRequest)]` to automatically implement both `JrMessage` and `JrRequest`:
2175///
2176/// ```ignore
2177/// use sacp::{JrRequest, JrResponsePayload};
2178/// use serde::{Serialize, Deserialize};
2179///
2180/// #[derive(Debug, Clone, Serialize, Deserialize, JrRequest)]
2181/// #[request(method = "_hello", response = HelloResponse)]
2182/// struct HelloRequest {
2183///     name: String,
2184/// }
2185///
2186/// #[derive(Debug, Serialize, Deserialize, JrResponsePayload)]
2187/// struct HelloResponse {
2188///     greeting: String,
2189/// }
2190/// ```
2191pub trait JrRequest: JrMessage {
2192    /// The type of data expected in response.
2193    type Response: JrResponsePayload;
2194}
2195
2196/// An enum capturing an in-flight request or notification.
2197/// In the case of a request, also includes the context used to respond to the request.
2198///
2199/// Type parameters allow specifying the concrete request and notification types.
2200/// By default, both are `UntypedMessage` for dynamic dispatch.
2201/// The request context's response type matches the request's response type.
2202#[derive(Debug)]
2203pub enum MessageCx<Req: JrRequest = UntypedMessage, Notif: JrMessage = UntypedMessage> {
2204    /// Incoming request and the context where the response should be sent.
2205    Request(Req, JrRequestCx<Req::Response>),
2206
2207    /// Incoming notification.
2208    Notification(Notif),
2209}
2210
2211impl<Req: JrRequest, Notif: JrMessage> MessageCx<Req, Notif> {
2212    /// Map the request and notification types to new types.
2213    pub fn map<Req1, Notif1>(
2214        self,
2215        map_request: impl FnOnce(Req, JrRequestCx<Req::Response>) -> (Req1, JrRequestCx<Req1::Response>),
2216        map_notification: impl FnOnce(Notif) -> Notif1,
2217    ) -> MessageCx<Req1, Notif1>
2218    where
2219        Req1: JrRequest<Response: Send>,
2220        Notif1: JrMessage,
2221    {
2222        match self {
2223            MessageCx::Request(request, cx) => {
2224                let (new_request, new_cx) = map_request(request, cx);
2225                MessageCx::Request(new_request, new_cx)
2226            }
2227            MessageCx::Notification(notification) => {
2228                let new_notification = map_notification(notification);
2229                MessageCx::Notification(new_notification)
2230            }
2231        }
2232    }
2233
2234    /// Respond to the message with an error.
2235    ///
2236    /// If this message is a request, this error becomes the reply to the request.
2237    ///
2238    /// If this message is a notification, the error is sent as a notification.
2239    pub fn respond_with_error<Link: JrLink>(
2240        self,
2241        error: crate::Error,
2242        cx: JrConnectionCx<Link>,
2243    ) -> Result<(), crate::Error> {
2244        match self {
2245            MessageCx::Request(_, request_cx) => request_cx.respond_with_error(error),
2246            MessageCx::Notification(_) => cx.send_error_notification(error),
2247        }
2248    }
2249
2250    /// Convert to a `JrRequestCx` that expects a JSON value
2251    /// and which checks (dynamically) that the JSON value it receives
2252    /// can be converted to `T`.
2253    pub fn erase_to_json(self) -> Result<MessageCx, crate::Error> {
2254        match self {
2255            MessageCx::Request(response, request_cx) => Ok(MessageCx::Request(
2256                response.to_untyped_message()?,
2257                request_cx.erase_to_json(),
2258            )),
2259            MessageCx::Notification(notification) => {
2260                Ok(MessageCx::Notification(notification.to_untyped_message()?))
2261            }
2262        }
2263    }
2264
2265    /// Convert the message in self to an untyped message.
2266    pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2267        match self {
2268            MessageCx::Request(request, _) => request.to_untyped_message(),
2269            MessageCx::Notification(notification) => notification.to_untyped_message(),
2270        }
2271    }
2272
2273    /// Convert self to an untyped message context.
2274    pub fn into_untyped_message_cx(self) -> Result<MessageCx, crate::Error> {
2275        match self {
2276            MessageCx::Request(request, request_cx) => Ok(MessageCx::Request(
2277                request.to_untyped_message()?,
2278                request_cx.erase_to_json(),
2279            )),
2280            MessageCx::Notification(notification) => {
2281                Ok(MessageCx::Notification(notification.to_untyped_message()?))
2282            }
2283        }
2284    }
2285
2286    /// Returns the request ID if this is a request, None if notification.
2287    pub fn id(&self) -> Option<serde_json::Value> {
2288        match self {
2289            MessageCx::Request(_, cx) => Some(cx.id()),
2290            MessageCx::Notification(_) => None,
2291        }
2292    }
2293
2294    /// Returns the method of the message (only available for UntypedMessage).
2295    pub fn method(&self) -> &str {
2296        match self {
2297            MessageCx::Request(msg, _) => msg.method(),
2298            MessageCx::Notification(msg) => msg.method(),
2299        }
2300    }
2301}
2302
2303impl MessageCx {
2304    /// Attempts to parse `self` into a typed message context.
2305    ///
2306    /// # Returns
2307    ///
2308    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2309    /// * `Ok(Err(self))` if not
2310    /// * `Err` if has the correct method for the given types but parsing fails
2311    pub(crate) fn into_typed_message_cx<Req: JrRequest, Notif: JrNotification>(
2312        self,
2313    ) -> Result<Result<MessageCx<Req, Notif>, MessageCx>, crate::Error> {
2314        match self {
2315            MessageCx::Request(message, request_cx) => {
2316                tracing::debug!(
2317                    request_type = std::any::type_name::<Req>(),
2318                    message = ?message,
2319                    "MessageHandler::handle_request"
2320                );
2321                match Req::parse_message(&message.method, &message.params) {
2322                    Some(Ok(req)) => {
2323                        tracing::trace!(?req, "MessageHandler::handle_request: parse completed");
2324                        Ok(Ok(MessageCx::Request(req, request_cx.cast())))
2325                    }
2326                    Some(Err(err)) => {
2327                        tracing::trace!(?err, "MessageHandler::handle_request: parse errored");
2328                        return Err(err);
2329                    }
2330                    None => {
2331                        tracing::trace!("MessageHandler::handle_request: parse failed");
2332                        Ok(Err(MessageCx::Request(message, request_cx)))
2333                    }
2334                }
2335            }
2336
2337            MessageCx::Notification(message) => {
2338                tracing::debug!(
2339                    notification_type = std::any::type_name::<Notif>(),
2340                    message = ?message,
2341                    "MessageHandler::handle_notification"
2342                );
2343                match Notif::parse_message(&message.method, &message.params) {
2344                    Some(Ok(notif)) => {
2345                        tracing::trace!(
2346                            ?notif,
2347                            "MessageHandler::handle_notification: parse completed"
2348                        );
2349                        Ok(Ok(MessageCx::Notification(notif)))
2350                    }
2351                    Some(Err(err)) => {
2352                        tracing::trace!(?err, "MessageHandler: parse errored");
2353                        Err(err)
2354                    }
2355                    None => {
2356                        tracing::trace!("MessageHandler: parse failed");
2357                        Ok(Err(MessageCx::Notification(message)))
2358                    }
2359                }
2360            }
2361        }
2362    }
2363
2364    /// True if this message has a session-id field
2365    pub fn has_field(&self, field_name: &str) -> bool {
2366        self.message().params().get(field_name).is_some()
2367    }
2368
2369    /// Extract the ACP session-id from this message (if any).
2370    pub(crate) fn has_session_id(&self) -> bool {
2371        self.has_field("sessionId")
2372    }
2373
2374    /// Extract the ACP session-id from this message (if any).
2375    pub(crate) fn get_session_id(&self) -> Result<Option<SessionId>, crate::Error> {
2376        let value = match self.message().params().get("sessionId") {
2377            Some(value) => value,
2378            None => return Ok(None),
2379        };
2380        let session_id = serde_json::from_value(value.clone())?;
2381        Ok(Some(session_id))
2382    }
2383
2384    /// Try to parse this as a notification of the given type.
2385    ///
2386    /// # Returns
2387    ///
2388    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2389    /// * `Ok(Err(self))` if not
2390    /// * `Err` if has the correct method for the given types but parsing fails
2391    pub fn into_notification<N: JrNotification>(
2392        self,
2393    ) -> Result<Result<N, MessageCx>, crate::Error> {
2394        match self {
2395            MessageCx::Request(..) => Ok(Err(self)),
2396            MessageCx::Notification(msg) => match N::parse_message(&msg.method, &msg.params) {
2397                Some(Ok(n)) => Ok(Ok(n)),
2398                Some(Err(err)) => Err(err),
2399                None => Ok(Err(MessageCx::Notification(msg))),
2400            },
2401        }
2402    }
2403
2404    /// Try to parse this as a request of the given type.
2405    ///
2406    /// # Returns
2407    ///
2408    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2409    /// * `Ok(Err(self))` if not
2410    /// * `Err` if has the correct method for the given types but parsing fails
2411    pub fn into_request<Req: JrRequest>(
2412        self,
2413    ) -> Result<Result<(Req, JrRequestCx<Req::Response>), MessageCx>, crate::Error> {
2414        match self {
2415            MessageCx::Request(msg, request_cx) => {
2416                match Req::parse_message(&msg.method, &msg.params) {
2417                    Some(Ok(req)) => Ok(Ok((req, request_cx.cast()))),
2418                    Some(Err(err)) => Err(err),
2419                    None => Ok(Err(MessageCx::Request(msg, request_cx))),
2420                }
2421            }
2422            MessageCx::Notification(..) => Ok(Err(self)),
2423        }
2424    }
2425}
2426
2427impl<M: JrRequest + JrNotification> MessageCx<M, M> {
2428    /// Returns the message of the message (only available for UntypedMessage).
2429    pub fn message(&self) -> &M {
2430        match self {
2431            MessageCx::Request(msg, _) => msg,
2432            MessageCx::Notification(msg) => msg,
2433        }
2434    }
2435
2436    /// Map the request/notification message.
2437    pub(crate) fn try_map_message(
2438        self,
2439        map_message: impl FnOnce(M) -> Result<M, crate::Error>,
2440    ) -> Result<MessageCx<M, M>, crate::Error> {
2441        match self {
2442            MessageCx::Request(request, cx) => Ok(MessageCx::Request(map_message(request)?, cx)),
2443            MessageCx::Notification(notification) => {
2444                Ok(MessageCx::<M, M>::Notification(map_message(notification)?))
2445            }
2446        }
2447    }
2448}
2449
2450/// An incoming JSON message without any typing. Can be a request or a notification.
2451#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
2452pub struct UntypedMessage {
2453    /// The JSON-RPC method name
2454    pub method: String,
2455    /// The JSON-RPC parameters as a raw JSON value
2456    pub params: serde_json::Value,
2457}
2458
2459impl UntypedMessage {
2460    /// Returns an untyped message with the given method and parameters.
2461    pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
2462        let params = serde_json::to_value(params)?;
2463        Ok(Self {
2464            method: method.to_string(),
2465            params,
2466        })
2467    }
2468
2469    /// Returns the method name
2470    pub fn method(&self) -> &str {
2471        &self.method
2472    }
2473
2474    /// Returns the parameters as a JSON value
2475    pub fn params(&self) -> &serde_json::Value {
2476        &self.params
2477    }
2478
2479    /// Consumes this message and returns the method and params
2480    pub fn into_parts(self) -> (String, serde_json::Value) {
2481        (self.method, self.params)
2482    }
2483}
2484
2485impl JrMessage for UntypedMessage {
2486    fn method(&self) -> &str {
2487        &self.method
2488    }
2489
2490    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2491        Ok(self.clone())
2492    }
2493
2494    fn parse_message(method: &str, params: &impl Serialize) -> Option<Result<Self, crate::Error>> {
2495        Some(UntypedMessage::new(method, params))
2496    }
2497}
2498
2499impl JrRequest for UntypedMessage {
2500    type Response = serde_json::Value;
2501}
2502
2503impl JrNotification for UntypedMessage {}
2504
2505/// Represents a pending response of type `R` from an outgoing request.
2506///
2507/// Returned by [`JrConnectionCx::send_request`], this type provides methods for handling
2508/// the response without blocking the event loop. The API is intentionally designed to make
2509/// it difficult to accidentally block.
2510///
2511/// # Anti-Footgun Design
2512///
2513/// You cannot directly `.await` a `JrResponse`. Instead, you must choose how to handle
2514/// the response:
2515///
2516/// ## Option 1: Schedule a Callback (Safe in Handlers)
2517///
2518/// Use [`on_receiving_result`](Self::on_receiving_result) to schedule a task
2519/// that runs when the response arrives. This doesn't block the event loop:
2520///
2521/// ```no_run
2522/// # use sacp_test::*;
2523/// # async fn example(cx: sacp::JrConnectionCx<sacp::link::UntypedLink>) -> Result<(), sacp::Error> {
2524/// cx.send_request(MyRequest {})
2525///     .on_receiving_result(async |result| {
2526///         match result {
2527///             Ok(response) => {
2528///                 // Handle successful response
2529///                 Ok(())
2530///             }
2531///             Err(error) => {
2532///                 // Handle error
2533///                 Err(error)
2534///             }
2535///         }
2536///     })?;
2537/// # Ok(())
2538/// # }
2539/// ```
2540///
2541/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
2542///
2543/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
2544/// in a spawned task (never in a handler):
2545///
2546/// ```no_run
2547/// # use sacp_test::*;
2548/// # async fn example(cx: sacp::JrConnectionCx<sacp::link::UntypedLink>) -> Result<(), sacp::Error> {
2549/// // ✅ Safe: Spawned task runs concurrently
2550/// cx.spawn({
2551///     let cx = cx.clone();
2552///     async move {
2553///         let response = cx.send_request(MyRequest {})
2554///             .block_task()
2555///             .await?;
2556///         // Process response...
2557///         Ok(())
2558///     }
2559/// })?;
2560/// # Ok(())
2561/// # }
2562/// ```
2563///
2564/// ```no_run
2565/// # use sacp_test::*;
2566/// # async fn example() -> Result<(), sacp::Error> {
2567/// # let connection = mock_connection();
2568/// // ❌ NEVER do this in a handler - blocks the event loop!
2569/// connection.on_receive_request(async |req: MyRequest, request_cx, cx| {
2570///     let response = cx.send_request(MyRequest {})
2571///         .block_task()  // This will deadlock!
2572///         .await?;
2573///     request_cx.respond(response)
2574/// }, sacp::on_receive_request!())
2575/// # .serve(sacp_test::MockTransport).await?;
2576/// # Ok(())
2577/// # }
2578/// ```
2579///
2580/// # Why This Design?
2581///
2582/// If you block the event loop while waiting for a response, the connection cannot process
2583/// the incoming response message, creating a deadlock. This API design prevents that footgun
2584/// by making blocking explicit and encouraging non-blocking patterns.
2585pub struct JrResponse<T> {
2586    method: String,
2587    task_tx: TaskTx,
2588    response_rx: oneshot::Receiver<ResponsePayload>,
2589    to_result: Box<dyn Fn(serde_json::Value) -> Result<T, crate::Error> + Send>,
2590}
2591
2592impl JrResponse<serde_json::Value> {
2593    fn new(
2594        method: String,
2595        task_tx: mpsc::UnboundedSender<Task>,
2596        response_rx: oneshot::Receiver<ResponsePayload>,
2597    ) -> Self {
2598        Self {
2599            method,
2600            response_rx,
2601            task_tx,
2602            to_result: Box::new(Ok),
2603        }
2604    }
2605}
2606
2607impl<T: JrResponsePayload> JrResponse<T> {
2608    /// The method of the request this is in response to.
2609    pub fn method(&self) -> &str {
2610        &self.method
2611    }
2612
2613    /// Create a new response that maps the result of the response to a new type.
2614    pub fn map<U>(
2615        self,
2616        map_fn: impl Fn(T) -> Result<U, crate::Error> + 'static + Send,
2617    ) -> JrResponse<U> {
2618        JrResponse {
2619            method: self.method,
2620            response_rx: self.response_rx,
2621            task_tx: self.task_tx,
2622            to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
2623        }
2624    }
2625
2626    /// Forward the response (success or error) to a request context when it arrives.
2627    ///
2628    /// This is a convenience method for proxying messages between connections. When the
2629    /// response arrives, it will be automatically sent to the provided request context,
2630    /// whether it's a successful response or an error.
2631    ///
2632    /// # Example: Proxying requests
2633    ///
2634    /// ```
2635    /// # use sacp::link::UntypedLink;
2636    /// # use sacp::{JrConnectionBuilder, JrConnectionCx};
2637    /// # use sacp_test::*;
2638    /// # async fn example(cx: JrConnectionCx<UntypedLink>) -> Result<(), sacp::Error> {
2639    /// // Set up backend connection
2640    /// let backend = UntypedLink::builder()
2641    ///     .on_receive_request(async |req: MyRequest, request_cx, cx| {
2642    ///         request_cx.respond(MyResponse { status: "ok".into() })
2643    ///     }, sacp::on_receive_request!())
2644    ///     .connect_to(MockTransport)?;
2645    ///
2646    /// // Spawn backend and get a context to send to it
2647    /// let backend_cx = cx.spawn_connection(backend, |c| Box::pin(c.serve()))?;
2648    ///
2649    /// // Set up proxy that forwards requests to backend
2650    /// UntypedLink::builder()
2651    ///     .on_receive_request({
2652    ///         let backend_cx = backend_cx.clone();
2653    ///         async move |req: MyRequest, request_cx, cx| {
2654    ///             // Forward the request to backend and proxy the response back
2655    ///             backend_cx.send_request(req)
2656    ///                 .forward_to_request_cx(request_cx)?;
2657    ///             Ok(())
2658    ///         }
2659    ///     }, sacp::on_receive_request!());
2660    /// # Ok(())
2661    /// # }
2662    /// ```
2663    ///
2664    /// # Type Safety
2665    ///
2666    /// The request context's response type must match the request's response type,
2667    /// ensuring type-safe message forwarding.
2668    ///
2669    /// # When to Use
2670    ///
2671    /// Use this when:
2672    /// - You're implementing a proxy or gateway pattern
2673    /// - You want to forward responses without processing them
2674    /// - The response types match between the outgoing request and incoming request
2675    ///
2676    /// This is equivalent to calling `on_receiving_result` and manually forwarding
2677    /// the result, but more concise.
2678    pub fn forward_to_request_cx(self, request_cx: JrRequestCx<T>) -> Result<(), crate::Error>
2679    where
2680        T: Send,
2681    {
2682        self.on_receiving_result(async move |result| request_cx.respond_with_result(result))
2683    }
2684
2685    /// Block the current task until the response is received.
2686    ///
2687    /// **Warning:** This method blocks the current async task. It is **only safe** to use
2688    /// in spawned tasks created with [`JrConnectionCx::spawn`]. Using it directly in a
2689    /// handler callback will deadlock the connection.
2690    ///
2691    /// # Safe Usage (in spawned tasks)
2692    ///
2693    /// ```no_run
2694    /// # use sacp_test::*;
2695    /// # async fn example() -> Result<(), sacp::Error> {
2696    /// # let connection = mock_connection();
2697    /// connection.on_receive_request(async |req: MyRequest, request_cx, cx| {
2698    ///     // Spawn a task to handle the request
2699    ///     cx.spawn({
2700    ///         let connection_cx = cx.clone();
2701    ///         async move {
2702    ///             // Safe: We're in a spawned task, not blocking the event loop
2703    ///             let response = connection_cx.send_request(OtherRequest {})
2704    ///                 .block_task()
2705    ///                 .await?;
2706    ///
2707    ///             // Process the response...
2708    ///             Ok(())
2709    ///         }
2710    ///     })?;
2711    ///
2712    ///     // Respond immediately
2713    ///     request_cx.respond(MyResponse { status: "ok".into() })
2714    /// }, sacp::on_receive_request!())
2715    /// # .serve(sacp_test::MockTransport).await?;
2716    /// # Ok(())
2717    /// # }
2718    /// ```
2719    ///
2720    /// # Unsafe Usage (in handlers - will deadlock!)
2721    ///
2722    /// ```no_run
2723    /// # use sacp_test::*;
2724    /// # async fn example() -> Result<(), sacp::Error> {
2725    /// # let connection = mock_connection();
2726    /// connection.on_receive_request(async |req: MyRequest, request_cx, cx| {
2727    ///     // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
2728    ///     let response = cx.send_request(OtherRequest {})
2729    ///         .block_task()
2730    ///         .await?;
2731    ///
2732    ///     request_cx.respond(MyResponse { status: response.value })
2733    /// }, sacp::on_receive_request!())
2734    /// # .serve(sacp_test::MockTransport).await?;
2735    /// # Ok(())
2736    /// # }
2737    /// ```
2738    ///
2739    /// # When to Use
2740    ///
2741    /// Use this method when:
2742    /// - You're in a spawned task (via [`JrConnectionCx::spawn`])
2743    /// - You need the response value to proceed with your logic
2744    /// - Linear control flow is more natural than callbacks
2745    ///
2746    /// For handler callbacks, use [`on_receiving_result`](Self::on_receiving_result) instead.
2747    pub async fn block_task(self) -> Result<T, crate::Error>
2748    where
2749        T: Send,
2750    {
2751        match self.response_rx.await {
2752            Ok(ResponsePayload {
2753                result: Ok(json_value),
2754                ack_tx,
2755            }) => {
2756                // Ack immediately - we're in a spawned task, so the dispatch loop
2757                // can continue while we process the value.
2758                if let Some(tx) = ack_tx {
2759                    let _ = tx.send(());
2760                }
2761                match (self.to_result)(json_value) {
2762                    Ok(value) => Ok(value),
2763                    Err(err) => Err(err),
2764                }
2765            }
2766            Ok(ResponsePayload {
2767                result: Err(err),
2768                ack_tx,
2769            }) => {
2770                if let Some(tx) = ack_tx {
2771                    let _ = tx.send(());
2772                }
2773                Err(err)
2774            }
2775            Err(err) => Err(crate::util::internal_error(format!(
2776                "response to `{}` never received: {}",
2777                self.method, err
2778            ))),
2779        }
2780    }
2781
2782    /// Schedule an async task to run when a successful response is received.
2783    ///
2784    /// This is a convenience wrapper around [`on_receiving_result`](Self::on_receiving_result)
2785    /// for the common pattern of forwarding errors to a request context while only processing
2786    /// successful responses.
2787    ///
2788    /// # Behavior
2789    ///
2790    /// - If the response is `Ok(value)`, your task receives the value and the request context
2791    /// - If the response is `Err(error)`, the error is automatically sent to `request_cx`
2792    ///   and your task is not called
2793    ///
2794    /// # Example: Chaining requests
2795    ///
2796    /// ```no_run
2797    /// # use sacp_test::*;
2798    /// # async fn example() -> Result<(), sacp::Error> {
2799    /// # let connection = mock_connection();
2800    /// connection.on_receive_request(async |req: ValidateRequest, request_cx, cx| {
2801    ///     // Send initial request
2802    ///     cx.send_request(ValidateRequest { data: req.data.clone() })
2803    ///         .on_receiving_ok_result(request_cx, async |validation, request_cx| {
2804    ///             // Only runs if validation succeeded
2805    ///             if validation.is_valid {
2806    ///                 // Respond to original request
2807    ///                 request_cx.respond(ValidateResponse { is_valid: true, error: None })
2808    ///             } else {
2809    ///                 request_cx.respond_with_error(sacp::util::internal_error("validation failed"))
2810    ///             }
2811    ///         })?;
2812    ///
2813    ///     Ok(())
2814    /// }, sacp::on_receive_request!())
2815    /// # .serve(sacp_test::MockTransport).await?;
2816    /// # Ok(())
2817    /// # }
2818    /// ```
2819    ///
2820    /// # Ordering
2821    ///
2822    /// Like [`on_receiving_result`](Self::on_receiving_result), the callback blocks the
2823    /// dispatch loop until it completes. See the [`ordering`](crate::concepts::ordering) module
2824    /// for details.
2825    ///
2826    /// # When to Use
2827    ///
2828    /// Use this when:
2829    /// - You need to respond to a request based on another request's result
2830    /// - You want errors to automatically propagate to the request context
2831    /// - You only care about the success case
2832    ///
2833    /// For more control over error handling, use [`on_receiving_result`](Self::on_receiving_result).
2834    #[track_caller]
2835    pub fn on_receiving_ok_result<F>(
2836        self,
2837        request_cx: JrRequestCx<T>,
2838        task: impl FnOnce(T, JrRequestCx<T>) -> F + 'static + Send,
2839    ) -> Result<(), crate::Error>
2840    where
2841        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
2842        T: Send,
2843    {
2844        self.on_receiving_result(async move |result| match result {
2845            Ok(value) => task(value, request_cx).await,
2846            Err(err) => request_cx.respond_with_error(err),
2847        })
2848    }
2849
2850    /// Schedule an async task to run when the response is received.
2851    ///
2852    /// This is the recommended way to handle responses in handler callbacks, as it doesn't
2853    /// block the event loop. The task will be spawned automatically when the response arrives.
2854    ///
2855    /// # Example: Handle response in callback
2856    ///
2857    /// ```no_run
2858    /// # use sacp_test::*;
2859    /// # async fn example() -> Result<(), sacp::Error> {
2860    /// # let connection = mock_connection();
2861    /// connection.on_receive_request(async |req: MyRequest, request_cx, cx| {
2862    ///     // Send a request and schedule a callback for the response
2863    ///     cx.send_request(QueryRequest { id: 22 })
2864    ///         .on_receiving_result({
2865    ///             let connection_cx = cx.clone();
2866    ///             async move |result| {
2867    ///                 match result {
2868    ///                     Ok(response) => {
2869    ///                         println!("Got response: {:?}", response);
2870    ///                         // Can send more messages here
2871    ///                         connection_cx.send_notification(QueryComplete {})?;
2872    ///                         Ok(())
2873    ///                 }
2874    ///                     Err(error) => {
2875    ///                         eprintln!("Request failed: {}", error);
2876    ///                         Err(error)
2877    ///                     }
2878    ///                 }
2879    ///             }
2880    ///         })?;
2881    ///
2882    ///     // Handler continues immediately without waiting
2883    ///     request_cx.respond(MyResponse { status: "processing".into() })
2884    /// }, sacp::on_receive_request!())
2885    /// # .serve(sacp_test::MockTransport).await?;
2886    /// # Ok(())
2887    /// # }
2888    /// ```
2889    ///
2890    /// # Ordering
2891    ///
2892    /// The callback runs as a spawned task, but the dispatch loop waits for it to complete
2893    /// before processing the next message. This gives you ordering guarantees: no other
2894    /// messages will be processed while your callback runs.
2895    ///
2896    /// This differs from [`block_task`](Self::block_task), which signals completion immediately
2897    /// upon receiving the response (before your code processes it).
2898    ///
2899    /// See the [`ordering`](crate::concepts::ordering) module for details on ordering guarantees
2900    /// and how to avoid deadlocks.
2901    ///
2902    /// # Error Handling
2903    ///
2904    /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
2905    /// errors appropriately within your task.
2906    ///
2907    /// # When to Use
2908    ///
2909    /// Use this method when:
2910    /// - You're in a handler callback (not a spawned task)
2911    /// - You want ordering guarantees (no other messages processed during your callback)
2912    /// - You need to do async work before "releasing" control back to the dispatch loop
2913    ///
2914    /// For spawned tasks where you don't need ordering guarantees, consider [`block_task`](Self::block_task).
2915    #[track_caller]
2916    pub fn on_receiving_result<F>(
2917        self,
2918        task: impl FnOnce(Result<T, crate::Error>) -> F + 'static + Send,
2919    ) -> Result<(), crate::Error>
2920    where
2921        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
2922        T: Send,
2923    {
2924        let task_tx = self.task_tx.clone();
2925        let method = self.method;
2926        let response_rx = self.response_rx;
2927        let to_result = self.to_result;
2928        let location = Location::caller();
2929
2930        Task::new(location, async move {
2931            match response_rx.await {
2932                Ok(ResponsePayload { result, ack_tx }) => {
2933                    // Convert the result using to_result for Ok values
2934                    let typed_result = match result {
2935                        Ok(json_value) => to_result(json_value),
2936                        Err(err) => Err(err),
2937                    };
2938
2939                    // Run the user's callback
2940                    let outcome = task(typed_result).await;
2941
2942                    // Ack AFTER the callback completes - this is the key difference
2943                    // from block_task. The dispatch loop waits for this ack.
2944                    if let Some(tx) = ack_tx {
2945                        let _ = tx.send(());
2946                    }
2947
2948                    outcome
2949                }
2950                Err(err) => Err(crate::util::internal_error(format!(
2951                    "response to `{}` never received: {}",
2952                    method, err
2953                ))),
2954            }
2955        })
2956        .spawn(&task_tx)
2957    }
2958}
2959
2960// ============================================================================
2961// IntoJrConnectionTransport Implementations
2962// ============================================================================
2963
2964/// A component that communicates over line streams.
2965///
2966/// `Lines` implements the [`Component`] trait for any pair of line-based streams
2967/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
2968/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
2969///
2970/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
2971/// transformation of individual lines before they are parsed or after they are serialized.
2972/// This is particularly useful for debugging, logging, or implementing custom line-based
2973/// protocols.
2974///
2975/// # Use Cases
2976///
2977/// - **Line-by-line logging**: Intercept and log each line before parsing
2978/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
2979/// - **Debugging**: Inspect raw message strings
2980/// - **Line filtering**: Skip or modify specific messages
2981///
2982/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
2983/// for byte-based I/O.
2984///
2985/// [`Component`]: crate::Component
2986pub struct Lines<OutgoingSink, IncomingStream> {
2987    /// Outgoing line sink (where we write serialized JSON-RPC messages)
2988    pub outgoing: OutgoingSink,
2989    /// Incoming line stream (where we read and parse JSON-RPC messages)
2990    pub incoming: IncomingStream,
2991}
2992
2993impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
2994where
2995    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
2996    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
2997{
2998    /// Create a new line stream transport.
2999    pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
3000        Self { outgoing, incoming }
3001    }
3002}
3003
3004impl<OutgoingSink, IncomingStream, L: JrLink> Component<L> for Lines<OutgoingSink, IncomingStream>
3005where
3006    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3007    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3008{
3009    async fn serve(self, client: impl Component<L::ConnectsTo>) -> Result<(), crate::Error> {
3010        let (channel, serve_self) = Component::<L>::into_server(self);
3011        match futures::future::select(Box::pin(client.serve(channel)), serve_self).await {
3012            Either::Left((result, _)) => result,
3013            Either::Right((result, _)) => result,
3014        }
3015    }
3016
3017    fn into_server(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3018        let Self { outgoing, incoming } = self;
3019
3020        // Create a channel pair for the client to use
3021        let (channel_for_caller, channel_for_lines) = Channel::duplex();
3022
3023        // Create the server future that runs the line stream actors
3024        let server_future = Box::pin(async move {
3025            let Channel { rx, tx } = channel_for_lines;
3026
3027            // Run both actors concurrently
3028            let outgoing_future = transport_actor::transport_outgoing_lines_actor(rx, outgoing);
3029            let incoming_future = transport_actor::transport_incoming_lines_actor(incoming, tx);
3030
3031            // Wait for both to complete
3032            futures::try_join!(outgoing_future, incoming_future)?;
3033
3034            Ok(())
3035        });
3036
3037        (channel_for_caller, server_future)
3038    }
3039}
3040
3041/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
3042///
3043/// `ByteStreams` implements the [`Component`] trait for any pair of `AsyncRead` and `AsyncWrite`
3044/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3045/// This is the standard way to communicate with external processes or network connections.
3046///
3047/// # Use Cases
3048///
3049/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
3050/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
3051/// - **Named pipes**: Cross-process communication on the same machine
3052/// - **File I/O**: Reading from and writing to file descriptors
3053///
3054/// # Example
3055///
3056/// Connecting to an agent via stdio:
3057///
3058/// ```no_run
3059/// use sacp::link::UntypedLink;
3060/// # use sacp::{ByteStreams};
3061/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
3062///
3063/// # async fn example() -> Result<(), sacp::Error> {
3064/// let component = ByteStreams::new(
3065///     tokio::io::stdout().compat_write(),
3066///     tokio::io::stdin().compat(),
3067/// );
3068///
3069/// // Use as a component in a connection
3070/// sacp::link::UntypedLink::builder()
3071///     .name("my-client")
3072///     .serve(component)
3073///     .await?;
3074/// # Ok(())
3075/// # }
3076/// ```
3077///
3078/// [`Component`]: crate::Component
3079pub struct ByteStreams<OB, IB> {
3080    /// Outgoing byte stream (where we write serialized messages)
3081    pub outgoing: OB,
3082    /// Incoming byte stream (where we read and parse messages)
3083    pub incoming: IB,
3084}
3085
3086impl<OB, IB> ByteStreams<OB, IB>
3087where
3088    OB: AsyncWrite + Send + 'static,
3089    IB: AsyncRead + Send + 'static,
3090{
3091    /// Create a new byte stream transport.
3092    pub fn new(outgoing: OB, incoming: IB) -> Self {
3093        Self { outgoing, incoming }
3094    }
3095}
3096
3097impl<OB, IB, L: JrLink> Component<L> for ByteStreams<OB, IB>
3098where
3099    OB: AsyncWrite + Send + 'static,
3100    IB: AsyncRead + Send + 'static,
3101{
3102    async fn serve(self, client: impl Component<L::ConnectsTo>) -> Result<(), crate::Error> {
3103        let (channel, serve_self) = Component::<L>::into_server(self);
3104        match futures::future::select(pin!(client.serve(channel)), serve_self).await {
3105            Either::Left((result, _)) => result,
3106            Either::Right((result, _)) => result,
3107        }
3108    }
3109
3110    fn into_server(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3111        use futures::AsyncBufReadExt;
3112        use futures::AsyncWriteExt;
3113        use futures::io::BufReader;
3114        let Self { outgoing, incoming } = self;
3115
3116        // Convert byte streams to line streams
3117        // Box both streams to satisfy Unpin requirements
3118        let incoming_lines = Box::pin(BufReader::new(incoming).lines());
3119
3120        // Create a sink that writes lines (with newlines) to the outgoing byte stream
3121        // We need to Box the writer since it may not be Unpin
3122        let outgoing_sink =
3123            futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
3124                let mut bytes = line.into_bytes();
3125                bytes.push(b'\n');
3126                writer.write_all(&bytes).await?;
3127                Ok::<_, std::io::Error>(writer)
3128            });
3129
3130        // Delegate to Lines component
3131        Component::<L>::into_server(Lines::new(outgoing_sink, incoming_lines))
3132    }
3133}
3134
3135/// A channel endpoint representing one side of a bidirectional message channel.
3136///
3137/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
3138/// Each endpoint has:
3139/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
3140/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
3141///
3142/// # Example
3143///
3144/// ```no_run
3145/// # use sacp::link::UntypedLink;
3146/// # use sacp::{Channel, JrConnectionBuilder};
3147/// # async fn example() -> Result<(), sacp::Error> {
3148/// // Create a pair of connected channels
3149/// let (channel_a, channel_b) = Channel::duplex();
3150///
3151/// // Each channel can be used by a different component
3152/// UntypedLink::builder()
3153///     .name("connection-a")
3154///     .serve(channel_a)
3155///     .await?;
3156/// # Ok(())
3157/// # }
3158/// ```
3159pub struct Channel {
3160    /// Receives messages (or errors) from the counterpart.
3161    pub rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
3162    /// Sends messages (or errors) to the counterpart.
3163    pub tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
3164}
3165
3166impl Channel {
3167    /// Create a pair of connected channel endpoints.
3168    ///
3169    /// Returns two `Channel` instances that are connected to each other:
3170    /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
3171    /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
3172    ///
3173    /// # Returns
3174    ///
3175    /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
3176    pub fn duplex() -> (Self, Self) {
3177        // Create channels: A sends Result<Message> which B receives as Message
3178        let (a_tx, b_rx) = mpsc::unbounded();
3179        let (b_tx, a_rx) = mpsc::unbounded();
3180
3181        let channel_a = Self { rx: a_rx, tx: a_tx };
3182        let channel_b = Self { rx: b_rx, tx: b_tx };
3183
3184        (channel_a, channel_b)
3185    }
3186
3187    /// Copy messages from `rx` to `tx`.
3188    ///
3189    /// # Returns
3190    ///
3191    /// A `Result` indicating success or failure.
3192    pub async fn copy(mut self) -> Result<(), crate::Error> {
3193        while let Some(msg) = self.rx.next().await {
3194            self.tx
3195                .unbounded_send(msg)
3196                .map_err(crate::util::internal_error)?;
3197        }
3198        Ok(())
3199    }
3200}
3201
3202impl<L: JrLink> Component<L> for Channel {
3203    async fn serve(self, client: impl Component<L::ConnectsTo>) -> Result<(), crate::Error> {
3204        let (client_channel, client_serve) = client.into_server();
3205
3206        match futures::try_join!(
3207            Channel {
3208                rx: client_channel.rx,
3209                tx: self.tx
3210            }
3211            .copy(),
3212            Channel {
3213                rx: self.rx,
3214                tx: client_channel.tx
3215            }
3216            .copy(),
3217            client_serve
3218        ) {
3219            Ok(((), (), ())) => Ok(()),
3220            Err(err) => Err(err),
3221        }
3222    }
3223
3224    fn into_server(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3225        (self, Box::pin(future::ready(Ok(()))))
3226    }
3227}