sacp/
jsonrpc.rs

1//! Core JSON-RPC server support.
2
3// Re-export jsonrpcmsg for use in public API
4pub use jsonrpcmsg;
5
6// Types re-exported from crate root
7use serde::{Deserialize, Serialize};
8use std::fmt::Debug;
9use std::panic::Location;
10use std::pin::pin;
11
12use boxfnonce::SendBoxFnOnce;
13use futures::channel::{mpsc, oneshot};
14use futures::future::{self, BoxFuture, Either};
15use futures::{AsyncRead, AsyncWrite, FutureExt, StreamExt};
16
17mod actors;
18pub(crate) mod handlers;
19mod task_actor;
20
21use crate::Component;
22use crate::handler::{ChainedHandler, NamedHandler};
23use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, NullHandler, RequestHandler};
24use crate::jsonrpc::task_actor::{PendingTask, Task};
25
26/// Handlers are invoked when new messages arrive at the [`JrConnection`].
27/// They have a chance to inspect the method and parameters and decide whether to "claim" the request
28/// (i.e., handle it). If they do not claim it, the request will be passed to the next handler.
29#[allow(async_fn_in_trait)]
30pub trait JrMessageHandler {
31    /// Attempt to claim an incoming message (request or notification).
32    ///
33    /// # Important: do not block
34    ///
35    /// The server will not process new messages until this handler returns.
36    /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
37    /// The recommended approach to manage expensive operations is to the [`JrConnectionCx::spawn`] method available on the message context.
38    ///
39    /// # Parameters
40    ///
41    /// * `cx` - The context of the request. This gives access to the request ID and the method name and is used to send a reply; can also be used to send other messages to the other party.
42    /// * `params` - The parameters of the request.
43    ///
44    /// # Returns
45    ///
46    /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
47    /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
48    /// * `Err` if an internal error occurs (this will bring down the server).
49    async fn handle_message(
50        &mut self,
51        message: MessageAndCx,
52    ) -> Result<Handled<MessageAndCx>, crate::Error>;
53
54    /// Returns a debug description of the handler chain for diagnostics
55    fn describe_chain(&self) -> impl std::fmt::Debug;
56}
57
58/// A JSON-RPC connection that can act as either a server, client, or both.
59///
60/// `JrConnection` provides a builder-style API for creating JSON-RPC servers and clients.
61/// You start by calling [`JrHandlerChain::new`], then add message handlers, and finally
62/// drive the connection with either [`serve`](JrHandlerChain::serve) or [`with_client`](JrHandlerChain::with_client),
63/// providing a component implementation (e.g., [`ByteStreams`] for byte streams).
64///
65/// # JSON-RPC Primer
66///
67/// JSON-RPC 2.0 has two fundamental message types:
68///
69/// * **Requests** - Messages that expect a response. They have an `id` field that gets
70///   echoed back in the response so the sender can correlate them.
71/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
72///   expect or receive a response.
73///
74/// # Type-Driven Message Dispatch
75///
76/// The handler registration methods use Rust's type system to determine which messages
77/// to handle. The type parameter you provide controls what gets dispatched to your handler:
78///
79/// ## Single Message Types
80///
81/// The simplest case - handle one specific message type:
82///
83/// ```no_run
84/// # use sacp_test::*;
85/// # use sacp::schema::{InitializeRequest, InitializeResponse, SessionNotification};
86/// # async fn example() -> Result<(), sacp::Error> {
87/// # let connection = mock_connection();
88/// connection
89///     .on_receive_request(async |req: InitializeRequest, cx| {
90///         // Handle only InitializeRequest messages
91///         cx.respond(InitializeResponse::make())
92///     })
93///     .on_receive_notification(async |notif: SessionNotification, cx| {
94///         // Handle only SessionUpdate notifications
95///         Ok(())
96///     })
97/// # .serve(sacp_test::MockTransport).await?;
98/// # Ok(())
99/// # }
100/// ```
101///
102/// ## Enum Message Types
103///
104/// You can also handle multiple related messages with a single handler by defining an enum
105/// that implements the appropriate trait ([`JrRequest`] or [`JrNotification`]):
106///
107/// ```no_run
108/// # use sacp_test::*;
109/// # use sacp::{JrRequest, JrMessage, UntypedMessage};
110/// # use sacp::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
111/// # async fn example() -> Result<(), sacp::Error> {
112/// # let connection = mock_connection();
113/// // Define an enum for multiple request types
114/// #[derive(Debug, Clone)]
115/// enum MyRequests {
116///     Initialize(InitializeRequest),
117///     Prompt(PromptRequest),
118/// }
119///
120/// // Implement JrRequest for your enum
121/// # impl JrMessage for MyRequests {
122/// #     fn method(&self) -> &str { "myRequests" }
123/// #     fn to_untyped_message(&self) -> Result<UntypedMessage, sacp::Error> { todo!() }
124/// #     fn parse_request(_method: &str, _params: &impl serde::Serialize) -> Option<Result<Self, sacp::Error>> { None }
125/// #     fn parse_notification(_method: &str, _params: &impl serde::Serialize) -> Option<Result<Self, sacp::Error>> { None }
126/// # }
127/// impl JrRequest for MyRequests { type Response = serde_json::Value; }
128///
129/// // Handle all variants in one place
130/// connection.on_receive_request(async |req: MyRequests, cx| {
131///     match req {
132///         MyRequests::Initialize(init) => { cx.respond(serde_json::json!({})) }
133///         MyRequests::Prompt(prompt) => { cx.respond(serde_json::json!({})) }
134///     }
135/// })
136/// # .serve(sacp_test::MockTransport).await?;
137/// # Ok(())
138/// # }
139/// ```
140///
141/// ## Mixed Message Types
142///
143/// For enums containing both requests AND notifications, use [`on_receive_message`](Self::on_receive_message):
144///
145/// ```no_run
146/// # use sacp_test::*;
147/// # use sacp::MessageAndCx;
148/// # use sacp::schema::{InitializeRequest, InitializeResponse, SessionNotification};
149/// # async fn example() -> Result<(), sacp::Error> {
150/// # let connection = mock_connection();
151/// // on_receive_message receives MessageAndCx which can be either a request or notification
152/// connection.on_receive_message(async |msg: MessageAndCx<InitializeRequest, SessionNotification>| {
153///     match msg {
154///         MessageAndCx::Request(req, request_cx) => {
155///             request_cx.respond(InitializeResponse::make())
156///         }
157///         MessageAndCx::Notification(notif, _cx) => {
158///             Ok(())
159///         }
160///     }
161/// })
162/// # .serve(sacp_test::MockTransport).await?;
163/// # Ok(())
164/// # }
165/// ```
166///
167/// # Handler Registration
168///
169/// Register handlers using these methods (listed from most common to most flexible):
170///
171/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
172/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
173/// * [`on_receive_message`](Self::on_receive_message) - Handle enums containing both requests and notifications
174/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
175///
176/// ## Handler Ordering
177///
178/// Handlers are tried in the order you register them. The first handler that claims a message
179/// (by matching its type) will process it. Subsequent handlers won't see that message:
180///
181/// ```no_run
182/// # use sacp_test::*;
183/// # use sacp::{MessageAndCx, UntypedMessage};
184/// # use sacp::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
185/// # async fn example() -> Result<(), sacp::Error> {
186/// # let connection = mock_connection();
187/// connection
188///     .on_receive_request(async |req: InitializeRequest, cx| {
189///         // This runs first for InitializeRequest
190///         cx.respond(InitializeResponse::make())
191///     })
192///     .on_receive_request(async |req: PromptRequest, cx| {
193///         // This runs first for PromptRequest
194///         cx.respond(PromptResponse::make())
195///     })
196///     .on_receive_message(async |msg: MessageAndCx| {
197///         // This runs for any message not handled above
198///         msg.respond_with_error(sacp::util::internal_error("unknown method"))
199///     })
200/// # .serve(sacp_test::MockTransport).await?;
201/// # Ok(())
202/// # }
203/// ```
204///
205/// # Event Loop and Concurrency
206///
207/// Understanding the event loop is critical for writing correct handlers.
208///
209/// ## The Event Loop
210///
211/// `JrConnection` runs all handler callbacks on a single async task - the event loop.
212/// While a handler is running, **the server cannot receive new messages**. This means
213/// any blocking or expensive work in your handlers will stall the entire connection.
214///
215/// To avoid blocking the event loop, use [`JrConnectionCx::spawn`] to offload serious
216/// work to concurrent tasks:
217///
218/// ```no_run
219/// # use sacp_test::*;
220/// # async fn example() -> Result<(), sacp::Error> {
221/// # let connection = mock_connection();
222/// connection.on_receive_request(async |req: AnalyzeRequest, cx| {
223///     // Clone cx for the spawned task
224///     cx.connection_cx().spawn({
225///         let connection_cx = cx.connection_cx();
226///         async move {
227///             let result = expensive_analysis(&req.data).await?;
228///             connection_cx.send_notification(AnalysisComplete { result })?;
229///             Ok(())
230///         }
231///     })?;
232///
233///     // Respond immediately without blocking
234///     cx.respond(AnalysisStarted { job_id: 42 })
235/// })
236/// # .serve(sacp_test::MockTransport).await?;
237/// # Ok(())
238/// # }
239/// ```
240///
241/// Note that the entire connection runs within one async task, so parallelism must be
242/// managed explicitly using [`spawn`](JrConnectionCx::spawn).
243///
244/// ## The Connection Context
245///
246/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
247///
248/// * **For request handlers** - [`JrRequestCx<R>`] provides [`respond`](JrRequestCx::respond)
249///   to send the response, plus methods to send other messages
250/// * **For notification handlers** - [`JrConnectionCx`] provides methods to send messages
251///   and spawn tasks
252///
253/// Both context types support:
254/// * [`send_request`](JrConnectionCx::send_request) - Send requests to the other side
255/// * [`send_notification`](JrConnectionCx::send_notification) - Send notifications
256/// * [`spawn`](JrConnectionCx::spawn) - Run tasks concurrently without blocking the event loop
257///
258/// The [`JrResponse`] returned by `send_request` provides methods like
259/// [`await_when_result_received`](JrResponse::await_when_result_received) that help you
260/// avoid accidentally blocking the event loop while waiting for responses.
261///
262/// # Driving the Connection
263///
264/// After adding handlers, you must drive the connection using one of two modes:
265///
266/// ## Server Mode: `serve()`
267///
268/// Use [`serve`](Self::serve) when you only need to respond to incoming messages:
269///
270/// ```no_run
271/// # use sacp_test::*;
272/// # async fn example() -> Result<(), sacp::Error> {
273/// # let connection = mock_connection();
274/// connection
275///     .on_receive_request(async |req: MyRequest, cx| {
276///         cx.respond(MyResponse { status: "ok".into() })
277///     })
278///     .serve(MockTransport)  // Runs until connection closes or error occurs
279///     .await?;
280/// # Ok(())
281/// # }
282/// ```
283///
284/// The connection will process incoming messages and invoke your handlers until the
285/// connection is closed or an error occurs.
286///
287/// ## Client Mode: `with_client()`
288///
289/// Use [`with_client`](Self::with_client) when you need to both handle incoming messages
290/// AND send your own requests/notifications:
291///
292/// ```no_run
293/// # use sacp_test::*;
294/// # use sacp::schema::InitializeRequest;
295/// # async fn example() -> Result<(), sacp::Error> {
296/// # let connection = mock_connection();
297/// connection
298///     .on_receive_request(async |req: MyRequest, cx| {
299///         cx.respond(MyResponse { status: "ok".into() })
300///     })
301///     .with_client(MockTransport, async |cx| {
302///         // You can send requests to the other side
303///         let response = cx.send_request(InitializeRequest::make())
304///             .block_task()
305///             .await?;
306///
307///         // And send notifications
308///         cx.send_notification(StatusUpdate { message: "ready".into() })?;
309///
310///         Ok(())
311///     })
312///     .await?;
313/// # Ok(())
314/// # }
315/// ```
316///
317/// The connection will serve incoming messages in the background while your client closure
318/// runs. When the closure returns, the connection shuts down.
319///
320/// # Example: Complete Agent
321///
322/// ```no_run
323/// # use sacp::JrHandlerChain;
324/// # use sacp::ByteStreams;
325/// # use sacp::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
326/// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
327/// # async fn example() -> Result<(), sacp::Error> {
328/// let transport = ByteStreams::new(
329///     tokio::io::stdout().compat_write(),
330///     tokio::io::stdin().compat(),
331/// );
332///
333/// JrHandlerChain::new()
334///     .name("my-agent")  // Optional: for debugging logs
335///     .on_receive_request(async |init: InitializeRequest, cx| {
336///         let response: InitializeResponse = todo!();
337///         cx.respond(response)
338///     })
339///     .on_receive_request(async |prompt: PromptRequest, cx| {
340///         // You can send notifications while processing a request
341///         let notif: SessionNotification = todo!();
342///         cx.connection_cx().send_notification(notif)?;
343///
344///         // Then respond to the request
345///         let response: PromptResponse = todo!();
346///         cx.respond(response)
347///     })
348///     .serve(transport)
349///     .await?;
350/// # Ok(())
351/// # }
352/// ```
353#[must_use]
354pub struct JrHandlerChain<H: JrMessageHandler> {
355    name: Option<String>,
356
357    /// Handler for incoming messages.
358    handler: H,
359
360    /// Pending tasks
361    pending_tasks: Vec<PendingTask>,
362}
363
364impl JrHandlerChain<NullHandler> {
365    /// Create a new JrConnection.
366    /// This type follows a builder pattern; use other methods to configure and then invoke
367    /// [`Self::serve`] (to use as a server) or [`Self::with_client`] to use as a client.
368    pub fn new() -> Self {
369        Self::new_with(NullHandler::default())
370    }
371}
372
373impl<H: JrMessageHandler> JrHandlerChain<H> {
374    /// Create a new handler chain with the given handler.
375    pub fn new_with(handler: H) -> Self {
376        Self {
377            name: Default::default(),
378            handler,
379            pending_tasks: Default::default(),
380        }
381    }
382
383    /// Set the "name" of this connection -- used only for debugging logs.
384    pub fn name(mut self, name: impl ToString) -> Self {
385        self.name = Some(name.to_string());
386        self
387    }
388
389    /// Add a new [`JrMessageHandler`] to the chain.
390    ///
391    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
392    /// This is a low-level method that is not intended for general use.
393    pub fn with_handler_chain<H1>(
394        mut self,
395        handler_chain: JrHandlerChain<H1>,
396    ) -> JrHandlerChain<ChainedHandler<H, NamedHandler<H1>>>
397    where
398        H1: JrMessageHandler,
399    {
400        self.pending_tasks.extend(
401            handler_chain
402                .pending_tasks
403                .into_iter()
404                .map(|t| t.named(handler_chain.name.clone())),
405        );
406
407        JrHandlerChain {
408            name: self.name,
409            handler: ChainedHandler::new(
410                self.handler,
411                NamedHandler::new(handler_chain.name, handler_chain.handler),
412            ),
413            pending_tasks: self.pending_tasks,
414        }
415    }
416
417    /// Add a new [`JrMessageHandler`] to the chain.
418    ///
419    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
420    /// This is a low-level method that is not intended for general use.
421    pub fn with_handler<H1>(self, handler: H1) -> JrHandlerChain<ChainedHandler<H, H1>>
422    where
423        H1: JrMessageHandler,
424    {
425        JrHandlerChain {
426            name: self.name,
427            handler: ChainedHandler::new(self.handler, handler),
428            pending_tasks: self.pending_tasks,
429        }
430    }
431
432    /// Enqueue a task to run once the connection is actively serving traffic.
433    #[track_caller]
434    pub fn with_spawned<F>(
435        mut self,
436        task: impl FnOnce(JrConnectionCx) -> F + Send + 'static,
437    ) -> Self
438    where
439        F: Future<Output = Result<(), crate::Error>> + Send + 'static,
440    {
441        let location = Location::caller();
442        self.pending_tasks.push(PendingTask::new(location, task));
443        self
444    }
445
446    /// Register a handler for messages that can be either requests OR notifications.
447    ///
448    /// Use this when you want to handle an enum type that contains both request and
449    /// notification variants. Your handler receives a [`MessageAndCx<R, N>`] which
450    /// is an enum with two variants:
451    ///
452    /// - `MessageAndCx::Request(request, request_cx)` - A request with its response context
453    /// - `MessageAndCx::Notification(notification, cx)` - A notification with the connection context
454    ///
455    /// # Example
456    ///
457    /// ```no_run
458    /// # use sacp_test::*;
459    /// # use sacp::MessageAndCx;
460    /// # async fn example() -> Result<(), sacp::Error> {
461    /// # let connection = mock_connection();
462    /// connection.on_receive_message(async |message: MessageAndCx<MyRequest, StatusUpdate>| {
463    ///     match message {
464    ///         MessageAndCx::Request(req, request_cx) => {
465    ///             // Handle request and send response
466    ///             request_cx.respond(MyResponse { status: "ok".into() })
467    ///         }
468    ///         MessageAndCx::Notification(notif, _cx) => {
469    ///             // Handle notification (no response needed)
470    ///             Ok(())
471    ///         }
472    ///     }
473    /// })
474    /// # .serve(sacp_test::MockTransport).await?;
475    /// # Ok(())
476    /// # }
477    /// ```
478    ///
479    /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
480    /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
481    /// for handling requests or notifications separately.
482    pub fn on_receive_message<R, N, F, T>(
483        self,
484        op: F,
485    ) -> JrHandlerChain<ChainedHandler<H, MessageHandler<R, N, F>>>
486    where
487        R: JrRequest,
488        N: JrNotification,
489        F: AsyncFnMut(MessageAndCx<R, N>) -> Result<T, crate::Error> + Send,
490        T: IntoHandled<MessageAndCx<R, N>>,
491    {
492        self.with_handler(MessageHandler::new(op))
493    }
494
495    /// Register a handler for JSON-RPC requests of type `R`.
496    ///
497    /// Your handler receives two arguments:
498    /// 1. The request (type `R`)
499    /// 2. A [`JrRequestCx<R::Response>`] for sending the response
500    ///
501    /// The request context allows you to:
502    /// - Send the response with [`JrRequestCx::respond`]
503    /// - Send notifications to the client with [`JrConnectionCx::send_notification`]
504    /// - Send requests to the client with [`JrConnectionCx::send_request`]
505    ///
506    /// # Example
507    ///
508    /// ```
509    /// # use sacp::JrHandlerChain;
510    /// # use sacp::schema::{PromptRequest, PromptResponse, SessionNotification};
511    /// # fn example(connection: JrHandlerChain<impl sacp::JrMessageHandler>) {
512    /// connection.on_receive_request(async |request: PromptRequest, request_cx| {
513    ///     // Send a notification while processing
514    ///     let notif: SessionNotification = todo!();
515    ///     request_cx.connection_cx().send_notification(notif)?;
516    ///
517    ///     // Do some work...
518    ///     let result = todo!("process the prompt");
519    ///
520    ///     // Send the response
521    ///     let response: PromptResponse = todo!();
522    ///     request_cx.respond(response)
523    /// });
524    /// # }
525    /// ```
526    ///
527    /// # Type Parameter
528    ///
529    /// `R` can be either a single request type or an enum of multiple request types.
530    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
531    pub fn on_receive_request<R, F, T>(
532        self,
533        op: F,
534    ) -> JrHandlerChain<ChainedHandler<H, RequestHandler<R, F>>>
535    where
536        R: JrRequest,
537        F: AsyncFnMut(R, JrRequestCx<R::Response>) -> Result<T, crate::Error> + Send,
538        T: IntoHandled<(R, JrRequestCx<R::Response>)>,
539    {
540        self.with_handler(RequestHandler::new(op))
541    }
542
543    /// Register a handler for JSON-RPC notifications of type `N`.
544    ///
545    /// Notifications are fire-and-forget messages that don't expect a response.
546    /// Your handler receives:
547    /// 1. The notification (type `N`)
548    /// 2. A [`JrConnectionCx`] for sending messages to the other side
549    ///
550    /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
551    /// but you can still send your own requests and notifications using the context.
552    ///
553    /// # Example
554    ///
555    /// ```no_run
556    /// # use sacp_test::*;
557    /// # async fn example() -> Result<(), sacp::Error> {
558    /// # let connection = mock_connection();
559    /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
560    ///     // Process the notification
561    ///     update_session_state(&notif)?;
562    ///
563    ///     // Optionally send a notification back
564    ///     cx.send_notification(StatusUpdate {
565    ///         message: "Acknowledged".into(),
566    ///     })?;
567    ///
568    ///     Ok(())
569    /// })
570    /// # .serve(sacp_test::MockTransport).await?;
571    /// # Ok(())
572    /// # }
573    /// ```
574    ///
575    /// # Type Parameter
576    ///
577    /// `N` can be either a single notification type or an enum of multiple notification types.
578    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
579    pub fn on_receive_notification<N, F, T>(
580        self,
581        op: F,
582    ) -> JrHandlerChain<ChainedHandler<H, NotificationHandler<N, F>>>
583    where
584        N: JrNotification,
585        F: AsyncFnMut(N, JrConnectionCx) -> Result<T, crate::Error> + Send,
586        T: IntoHandled<(N, JrConnectionCx)>,
587    {
588        self.with_handler(NotificationHandler::new(op))
589    }
590
591    /// Connect these handlers to a transport layer.
592    /// The resulting connection must then be either [served](`JrConnection::serve`) or [used as a client](`JrConnection::with_client`).
593    pub fn connect_to(
594        self,
595        transport: impl Component + 'static,
596    ) -> Result<JrConnection<H>, crate::Error> {
597        let Self {
598            name,
599            handler,
600            pending_tasks,
601        } = self;
602
603        let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
604        let (new_task_tx, new_task_rx) = mpsc::unbounded();
605        let cx = JrConnectionCx::new(outgoing_tx, new_task_tx);
606
607        // Convert transport into server - this returns a channel for us to use
608        // and a future that runs the transport
609        let transport_component = crate::DynComponent::new(transport);
610        let (transport_channel, transport_future) = transport_component.into_server();
611        cx.spawn(transport_future)?;
612
613        // Destructure the channel endpoints
614        let Channel {
615            rx: transport_incoming_rx,
616            tx: transport_outgoing_tx,
617        } = transport_channel;
618
619        // Spawn pending tasks
620        for pending_task in pending_tasks {
621            let task = pending_task.into_task(cx.clone());
622            cx.spawn_task(task)?;
623        }
624
625        Ok(JrConnection {
626            cx,
627            name,
628            outgoing_rx,
629            new_task_rx,
630            transport_outgoing_tx,
631            transport_incoming_rx,
632            handler,
633        })
634    }
635
636    /// Apply the handler chain to a single message.
637    ///
638    /// This method processes one message through the entire handler chain, attempting to
639    /// match it against each registered handler in order. This is useful when implementing
640    /// custom message handling logic or when you need fine-grained control over message
641    /// processing.
642    ///
643    /// # Returns
644    ///
645    /// - `Ok(Handled::Claimed)` - A handler claimed and processed the message
646    /// - `Ok(Handled::Unclaimed(message))` - No handler matched the message
647    /// - `Err(_)` - A handler encountered an error while processing
648    ///
649    /// # Borrow Checker Considerations
650    ///
651    /// You may find that [`MatchMessage`] is a better choice than this method
652    /// for implementing custom handlers. It offers a very similar API to
653    /// [`JrHandlerChain`] but is structured to apply each test one at a time
654    /// (sequentially) instead of setting them all up at once. This sequential approach
655    /// often interacts better with the borrow checker, at the cost of requiring `.await`
656    /// calls between each handler and only working for processing a single message.
657    ///
658    /// # Example: Borrow Checker Challenges
659    ///
660    /// When building a handler chain with `async {}` blocks (non-move), you might encounter
661    /// borrow checker errors if multiple handlers need access to the same mutable state:
662    ///
663    /// ```compile_fail
664    /// # use sacp::{JrHandlerChain, JrRequestCx};
665    /// # use sacp::schema::{InitializeRequest, InitializeResponse};
666    /// # use sacp::schema::{PromptRequest, PromptResponse};
667    /// # async fn example() -> Result<(), sacp::Error> {
668    /// let mut state = String::from("initial");
669    ///
670    /// // This fails to compile because both handlers borrow `state` mutably,
671    /// // and the futures are set up at the same time (even though only one will run)
672    /// let chain = JrHandlerChain::new()
673    ///     .on_receive_request(async |req: InitializeRequest, cx: JrRequestCx| {
674    ///         state.push_str(" - initialized");  // First mutable borrow
675    ///         cx.respond(InitializeResponse::make())
676    ///     })
677    ///     .on_receive_request(async |req: PromptRequest, cx: JrRequestCx| {
678    ///         state.push_str(" - prompted");  // Second mutable borrow - ERROR!
679    ///         cx.respond(PromptResponse { content: vec![], stopReason: None })
680    ///     });
681    /// # Ok(())
682    /// # }
683    /// ```
684    ///
685    /// You can work around this by using `apply()` to process messages one at a time,
686    /// or use [`MatchMessage`] which provides a similar API but applies handlers sequentially:
687    ///
688    /// ```ignore
689    /// use sacp::{MessageAndCx, Handled};
690    /// use sacp::util::MatchMessage;
691    ///
692    /// async fn handle_with_state(
693    ///     message: MessageAndCx,
694    ///     state: &mut String,
695    /// ) -> Result<Handled<MessageAndCx>, sacp::Error> {
696    ///     MatchMessage::new(message)
697    ///         .on_request(async |req: InitializeRequest, cx| {
698    ///             state.push_str(" - initialized");  // Sequential - OK!
699    ///             cx.respond(InitializeResponse::make())
700    ///         })
701    ///         .on_request(async |req: PromptRequest, cx| {
702    ///             state.push_str(" - prompted");  // Sequential - OK!
703    ///             cx.respond(PromptResponse { content: vec![], stopReason: None })
704    ///         })
705    ///         .otherwise(async |msg| Ok(Handled::Unclaimed(msg)))
706    ///         .await
707    /// }
708    /// ```
709    pub async fn apply(
710        &mut self,
711        message: MessageAndCx,
712    ) -> Result<Handled<MessageAndCx>, crate::Error> {
713        self.handler.handle_message(message).await
714    }
715
716    /// Convenience method to connect to a transport and serve.
717    ///
718    /// This is equivalent to:
719    /// ```ignore
720    /// handler_chain.connect_to(transport)?.serve().await
721    /// ```
722    pub async fn serve(self, transport: impl Component + 'static) -> Result<(), crate::Error> {
723        self.connect_to(transport)?.serve().await
724    }
725
726    /// Convenience method to connect to a transport and run a client function.
727    ///
728    /// This is equivalent to:
729    /// ```ignore
730    /// handler_chain.connect_to(transport)?.with_client(main_fn).await
731    /// ```
732    pub async fn with_client(
733        self,
734        transport: impl Component + 'static,
735        main_fn: impl AsyncFnOnce(JrConnectionCx) -> Result<(), crate::Error>,
736    ) -> Result<(), crate::Error> {
737        self.connect_to(transport)?.with_client(main_fn).await
738    }
739}
740
741/// A JSON-RPC connection with an active transport.
742///
743/// This type represents a `JrHandlerChain` that has been connected to a transport
744/// via `connect_to()`. It can be driven in two modes:
745///
746/// - [`serve()`](Self::serve) - Run as a server, handling incoming messages until the connection closes
747/// - [`with_client()`](Self::with_client) - Run with client logic, allowing you to send requests/notifications
748///
749/// Most users won't construct this directly - instead use `JrHandlerChain::connect_to()` or
750/// `JrHandlerChain::serve()` for convenience.
751pub struct JrConnection<H: JrMessageHandler> {
752    cx: JrConnectionCx,
753    name: Option<String>,
754    outgoing_rx: mpsc::UnboundedReceiver<OutgoingMessage>,
755    new_task_rx: mpsc::UnboundedReceiver<Task>,
756    transport_outgoing_tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
757    transport_incoming_rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
758    handler: H,
759}
760
761impl<H: JrMessageHandler> JrConnection<H> {
762    /// Run the connection in server mode with the provided transport.
763    ///
764    /// This drives the connection by continuously processing messages from the transport
765    /// and dispatching them to your registered handlers. The connection will run until:
766    /// - The transport closes (e.g., EOF on byte streams)
767    /// - An error occurs
768    /// - One of your handlers returns an error
769    ///
770    /// The transport is responsible for serializing and deserializing `jsonrpcmsg::Message`
771    /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
772    ///
773    /// Use this mode when you only need to respond to incoming messages and don't need
774    /// to initiate your own requests. If you need to send requests to the other side,
775    /// use [`with_client`](Self::with_client) instead.
776    ///
777    /// # Example: Byte Stream Transport
778    ///
779    /// ```no_run
780    /// # use sacp::JrHandlerChain;
781    /// # use sacp::ByteStreams;
782    /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
783    /// # use sacp_test::*;
784    /// # async fn example() -> Result<(), sacp::Error> {
785    /// let transport = ByteStreams::new(
786    ///     tokio::io::stdout().compat_write(),
787    ///     tokio::io::stdin().compat(),
788    /// );
789    ///
790    /// JrHandlerChain::new()
791    ///     .on_receive_request(async |req: MyRequest, cx| {
792    ///         cx.respond(MyResponse { status: "ok".into() })
793    ///     })
794    ///     .serve(transport)
795    ///     .await?;
796    /// # Ok(())
797    /// # }
798    /// ```
799    pub async fn serve(self) -> Result<(), crate::Error> {
800        self.with_client(async move |_cx| future::pending().await)
801            .await
802    }
803
804    /// Run the connection in client mode, both handling incoming messages and sending your own.
805    ///
806    /// This drives the connection by:
807    /// 1. Running your registered handlers in the background to process incoming messages
808    /// 2. Executing your `main_fn` closure with a [`JrConnectionCx`] for sending requests/notifications
809    ///
810    /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
811    /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
812    ///
813    /// Use this mode when you need to initiate communication (send requests/notifications)
814    /// in addition to responding to incoming messages. For server-only mode where you just
815    /// respond to messages, use [`serve`](Self::serve) instead.
816    ///
817    /// # Example
818    ///
819    /// ```no_run
820    /// # use sacp::JrHandlerChain;
821    /// # use sacp::ByteStreams;
822    /// # use sacp::schema::InitializeRequest;
823    /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
824    /// # use sacp_test::*;
825    /// # async fn example() -> Result<(), sacp::Error> {
826    /// let transport = ByteStreams::new(
827    ///     tokio::io::stdout().compat_write(),
828    ///     tokio::io::stdin().compat(),
829    /// );
830    ///
831    /// JrHandlerChain::new()
832    ///     .on_receive_request(async |req: MyRequest, cx| {
833    ///         // Handle incoming requests in the background
834    ///         cx.respond(MyResponse { status: "ok".into() })
835    ///     })
836    ///     .connect_to(transport)?
837    ///     .with_client(async |cx| {
838    ///         // Initialize the protocol
839    ///         let init_response = cx.send_request(InitializeRequest::make())
840    ///             .block_task()
841    ///             .await?;
842    ///
843    ///         // Send more requests...
844    ///         let result = cx.send_request(MyRequest {})
845    ///             .block_task()
846    ///             .await?;
847    ///
848    ///         // When this closure returns, the connection shuts down
849    ///         Ok(())
850    ///     })
851    ///     .await?;
852    /// # Ok(())
853    /// # }
854    /// ```
855    ///
856    /// # Parameters
857    ///
858    /// - `transport`: The transport implementation for sending/receiving messages
859    /// - `main_fn`: Your client logic. Receives a [`JrConnectionCx`] for sending messages.
860    ///
861    /// # Errors
862    ///
863    /// Returns an error if the connection closes before `main_fn` completes.
864    pub async fn with_client(
865        self,
866        main_fn: impl AsyncFnOnce(JrConnectionCx) -> Result<(), crate::Error>,
867    ) -> Result<(), crate::Error> {
868        let JrConnection {
869            cx,
870            name,
871            outgoing_rx,
872            new_task_rx,
873            handler,
874            transport_outgoing_tx,
875            transport_incoming_rx,
876        } = self;
877        let (reply_tx, reply_rx) = mpsc::unbounded();
878
879        crate::util::instrument_with_connection_name(name, async move {
880            futures::select!(
881                // Protocol layer: OutgoingMessage → jsonrpcmsg::Message
882                r = actors::outgoing_protocol_actor(
883                    outgoing_rx,
884                    reply_tx.clone(),
885                    transport_outgoing_tx,
886                ).fuse() => {
887                    tracing::trace!(?r, "outgoing protocol actor terminated");
888                    r?;
889                }
890                // Protocol layer: jsonrpcmsg::Message → handler/reply routing
891                r = actors::incoming_protocol_actor(
892                    &cx,
893                    transport_incoming_rx,
894                    reply_tx,
895                    handler,
896                ).fuse() => {
897                    tracing::trace!(?r, "incoming protocol actor terminated");
898                    r?;
899                }
900                r = actors::reply_actor(reply_rx).fuse() => {
901                    tracing::trace!(?r, "reply actor terminated");
902                    r?;
903                }
904                r = task_actor::task_actor(new_task_rx).fuse() => {
905                    tracing::trace!(?r, "task actor terminated");
906                    r?;
907                }
908                r = main_fn(cx.clone()).fuse() => {
909                    tracing::trace!(?r, "main actor terminated");
910                    r?;
911                }
912            );
913            Ok(())
914        })
915        .await
916    }
917}
918
919/// Message sent to the reply management actor
920enum ReplyMessage {
921    /// Wait for a response to the given id and then send it to the given receiver
922    Subscribe(
923        jsonrpcmsg::Id,
924        oneshot::Sender<Result<serde_json::Value, crate::Error>>,
925    ),
926
927    /// Dispatch a response to the given id and value
928    Dispatch(jsonrpcmsg::Id, Result<serde_json::Value, crate::Error>),
929}
930
931/// Messages send to be serialized over the transport.
932#[derive(Debug)]
933enum OutgoingMessage {
934    /// Send a request to the server.
935    Request {
936        /// method to use in the request
937        method: String,
938
939        /// parameters for the request
940        params: Option<jsonrpcmsg::Params>,
941
942        /// where to send the response when it arrives
943        response_tx: oneshot::Sender<Result<serde_json::Value, crate::Error>>,
944    },
945
946    /// Send a notification to the server.
947    Notification {
948        /// method to use in the request
949        method: String,
950
951        /// parameters for the request
952        params: Option<jsonrpcmsg::Params>,
953    },
954
955    /// Send a reponse to a message from the server
956    Response {
957        id: jsonrpcmsg::Id,
958
959        response: Result<serde_json::Value, crate::Error>,
960    },
961
962    /// Send a generalized error message
963    Error { error: crate::Error },
964}
965
966/// Return type from JrHandler; indicates whether the request was handled or not.
967#[must_use]
968pub enum Handled<T> {
969    /// The message was handled
970    Yes,
971    /// The message was not handled; returns the original value
972    No(T),
973}
974
975/// Trait for converting handler return values into [`Handled`].
976///
977/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
978/// or an explicit `Handled<T>` value for more control over handler chain propagation.
979pub trait IntoHandled<T> {
980    /// Convert this value into a `Handled<T>`.
981    fn into_handled(self) -> Handled<T>;
982}
983
984impl<T> IntoHandled<T> for () {
985    fn into_handled(self) -> Handled<T> {
986        Handled::Yes
987    }
988}
989
990impl<T> IntoHandled<T> for Handled<T> {
991    fn into_handled(self) -> Handled<T> {
992        self
993    }
994}
995
996/// Trait for types that can provide transport for JSON-RPC messages.
997///
998/// Implementations of this trait bridge between the internal protocol channels
999/// (which carry `jsonrpcmsg::Message`) and the actual I/O mechanism (byte streams,
1000/// in-process channels, network sockets, etc.).
1001///
1002/// The transport layer is responsible only for moving `jsonrpcmsg::Message` in and out.
1003/// It has no knowledge of protocol semantics like request/response correlation, ID assignment,
1004/// or handler dispatch - those are handled by the protocol layer in `JrConnection`.
1005///
1006/// # Example
1007///
1008/// See [`ByteStreams`] for the standard byte stream implementation.
1009
1010/// Connection context for sending messages and spawning tasks.
1011///
1012/// This is the primary handle for interacting with the JSON-RPC connection from
1013/// within handler callbacks. You can use it to:
1014///
1015/// * Send requests and notifications to the other side
1016/// * Spawn concurrent tasks that run alongside the connection
1017/// * Respond to requests (via [`JrRequestCx`] which wraps this)
1018///
1019/// # Cloning
1020///
1021/// `JrConnectionCx` is cheaply cloneable - all clones refer to the same underlying connection.
1022/// This makes it easy to share across async tasks.
1023///
1024/// # Event Loop and Concurrency
1025///
1026/// Handler callbacks run on the event loop, which means the connection cannot process new
1027/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
1028/// expensive or blocking work to concurrent tasks.
1029///
1030/// See the [Event Loop and Concurrency](JrConnection#event-loop-and-concurrency) section
1031/// for more details.
1032#[derive(Clone, Debug)]
1033pub struct JrConnectionCx {
1034    message_tx: mpsc::UnboundedSender<OutgoingMessage>,
1035    task_tx: mpsc::UnboundedSender<Task>,
1036}
1037
1038impl JrConnectionCx {
1039    fn new(
1040        tx: mpsc::UnboundedSender<OutgoingMessage>,
1041        task_tx: mpsc::UnboundedSender<Task>,
1042    ) -> Self {
1043        Self {
1044            message_tx: tx,
1045            task_tx,
1046        }
1047    }
1048
1049    /// Spawns a task that will run so long as the JSON-RPC connection is being served.
1050    ///
1051    /// This is the primary mechanism for offloading expensive work from handler callbacks
1052    /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
1053    /// allowing the server to continue processing messages.
1054    ///
1055    /// # Event Loop
1056    ///
1057    /// Handler callbacks run on the event loop, which cannot process new messages while
1058    /// your handler is running. Use `spawn` for any expensive operations:
1059    ///
1060    /// ```no_run
1061    /// # use sacp_test::*;
1062    /// # async fn example() -> Result<(), sacp::Error> {
1063    /// # let connection = mock_connection();
1064    /// connection.on_receive_request(async |req: ProcessRequest, cx| {
1065    ///     // Clone cx for the spawned task
1066    ///     cx.connection_cx().spawn({
1067    ///         let connection_cx = cx.connection_cx();
1068    ///         async move {
1069    ///             let result = expensive_operation(&req.data).await?;
1070    ///             connection_cx.send_notification(ProcessComplete { result })?;
1071    ///             Ok(())
1072    ///         }
1073    ///     })?;
1074    ///
1075    ///     // Respond immediately
1076    ///     cx.respond(ProcessResponse { result: "started".into() })
1077    /// })
1078    /// # .serve(sacp_test::MockTransport).await?;
1079    /// # Ok(())
1080    /// # }
1081    /// ```
1082    ///
1083    /// # Errors
1084    ///
1085    /// If the spawned task returns an error, the entire server will shut down.
1086    #[track_caller]
1087    pub fn spawn(
1088        &self,
1089        task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
1090    ) -> Result<(), crate::Error> {
1091        let location = std::panic::Location::caller();
1092        let task = task.into_future();
1093        self.spawn_task(Task::new(location, task))
1094    }
1095
1096    fn spawn_task(&self, task: Task) -> Result<(), crate::Error> {
1097        self.task_tx
1098            .unbounded_send(task)
1099            .map_err(crate::util::internal_error)
1100    }
1101
1102    /// Spawn a JSON-RPC connection in the background and return a [`JrConnectionCx`] for sending messages to it.
1103    ///
1104    /// This is useful for creating multiple connections that communicate with each other,
1105    /// such as implementing proxy patterns or connecting to multiple backend services.
1106    ///
1107    /// # Arguments
1108    ///
1109    /// - `connection`: The `JrConnection` to spawn (typically created via `JrHandlerChain::connect_to()`)
1110    /// - `serve_future`: A function that drives the connection (usually `|c| Box::pin(c.serve())`)
1111    ///
1112    /// # Returns
1113    ///
1114    /// A `JrConnectionCx` that you can use to send requests and notifications to the spawned connection.
1115    ///
1116    /// # Example: Proxying to a backend connection
1117    ///
1118    /// ```
1119    /// # use sacp::{JrHandlerChain, JrConnectionCx};
1120    /// # use sacp_test::*;
1121    /// # async fn example(cx: JrConnectionCx) -> Result<(), sacp::Error> {
1122    /// // Set up a backend connection
1123    /// let backend = JrHandlerChain::new()
1124    ///     .on_receive_request(async |req: MyRequest, request_cx| {
1125    ///         request_cx.respond(MyResponse { status: "ok".into() })
1126    ///     })
1127    ///     .connect_to(MockTransport)?;
1128    ///
1129    /// // Spawn it and get a context to send requests to it
1130    /// let backend_cx = cx.spawn_connection(backend, |c| Box::pin(c.serve()))?;
1131    ///
1132    /// // Now you can forward requests to the backend
1133    /// let response = backend_cx.send_request(MyRequest {}).block_task().await?;
1134    /// # Ok(())
1135    /// # }
1136    /// ```
1137    #[track_caller]
1138    pub fn spawn_connection<H: JrMessageHandler>(
1139        &self,
1140        connection: JrConnection<H>,
1141        serve_future: impl FnOnce(JrConnection<H>) -> BoxFuture<'static, Result<(), crate::Error>>,
1142    ) -> Result<JrConnectionCx, crate::Error> {
1143        let cx = connection.cx.clone();
1144        let future = serve_future(connection);
1145        let task = Task::new(std::panic::Location::caller(), future);
1146        self.spawn_task(task)?;
1147        Ok(cx)
1148    }
1149
1150    /// Send a request/notification and forward the response appropriately.
1151    ///
1152    /// The request context's response type matches the request's response type,
1153    /// enabling type-safe message forwarding.
1154    pub fn send_proxied_message<R, N>(
1155        &self,
1156        message: MessageAndCx<R, N>,
1157    ) -> Result<(), crate::Error>
1158    where
1159        R: JrRequest<Response: Send>,
1160        N: JrNotification,
1161    {
1162        match message {
1163            MessageAndCx::Request(request, request_cx) => {
1164                self.send_request(request).forward_to_request_cx(request_cx)
1165            }
1166            MessageAndCx::Notification(notification, _) => self.send_notification(notification),
1167        }
1168    }
1169
1170    /// Send an outgoing request and return a [`JrResponse`] for handling the reply.
1171    ///
1172    /// The returned [`JrResponse`] provides methods for receiving the response without
1173    /// blocking the event loop:
1174    ///
1175    /// * [`await_when_result_received`](JrResponse::await_when_result_received) - Schedule
1176    ///   a callback to run when the response arrives (doesn't block the event loop)
1177    /// * [`block_task`](JrResponse::block_task) - Block the current task until the response
1178    ///   arrives (only safe in spawned tasks, not in handlers)
1179    ///
1180    /// # Anti-Footgun Design
1181    ///
1182    /// The API intentionally makes it difficult to block on the result directly to prevent
1183    /// the common mistake of blocking the event loop while waiting for a response:
1184    ///
1185    /// ```compile_fail
1186    /// # use sacp_test::*;
1187    /// # async fn example(cx: sacp::JrConnectionCx) -> Result<(), sacp::Error> {
1188    /// // ❌ This doesn't compile - prevents blocking the event loop
1189    /// let response = cx.send_request(MyRequest {}).await?;
1190    /// # Ok(())
1191    /// # }
1192    /// ```
1193    ///
1194    /// ```no_run
1195    /// # use sacp_test::*;
1196    /// # async fn example(cx: sacp::JrConnectionCx) -> Result<(), sacp::Error> {
1197    /// // ✅ Option 1: Schedule callback (safe in handlers)
1198    /// cx.send_request(MyRequest {})
1199    ///     .await_when_result_received(async |result| {
1200    ///         // Handle the response
1201    ///         Ok(())
1202    ///     })?;
1203    ///
1204    /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
1205    /// cx.spawn({
1206    ///     let cx = cx.clone();
1207    ///     async move {
1208    ///         let response = cx.send_request(MyRequest {})
1209    ///             .block_task()
1210    ///             .await?;
1211    ///         // Process response...
1212    ///         Ok(())
1213    ///     }
1214    /// })?;
1215    /// # Ok(())
1216    /// # }
1217    /// ```
1218    pub fn send_request<Req: JrRequest>(&self, request: Req) -> JrResponse<Req::Response> {
1219        let method = request.method().to_string();
1220        let (response_tx, response_rx) = oneshot::channel();
1221        match request.to_untyped_message() {
1222            Ok(untyped) => {
1223                let params = crate::util::json_cast(untyped.params).ok();
1224                let message = OutgoingMessage::Request {
1225                    method: method.clone(),
1226                    params,
1227                    response_tx,
1228                };
1229
1230                match self.message_tx.unbounded_send(message) {
1231                    Ok(()) => (),
1232                    Err(error) => {
1233                        let OutgoingMessage::Request {
1234                            method,
1235                            response_tx,
1236                            ..
1237                        } = error.into_inner()
1238                        else {
1239                            unreachable!();
1240                        };
1241
1242                        response_tx
1243                            .send(Err(communication_failure(format!(
1244                                "failed to send outgoing request `{method}"
1245                            ))))
1246                            .unwrap();
1247                    }
1248                }
1249            }
1250
1251            Err(_) => {
1252                response_tx
1253                    .send(Err(communication_failure(format!(
1254                        "failed to create untyped request for `{method}"
1255                    ))))
1256                    .unwrap();
1257            }
1258        }
1259
1260        JrResponse::new(method.clone(), self.clone(), response_rx)
1261            .map(move |json| <Req::Response>::from_value(&method, json))
1262    }
1263
1264    /// Send an outgoing notification (no reply expected).
1265    ///
1266    /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
1267    /// This method sends the notification immediately and returns.
1268    ///
1269    /// ```no_run
1270    /// # use sacp_test::*;
1271    /// # async fn example(cx: sacp::JrConnectionCx) -> Result<(), sacp::Error> {
1272    /// cx.send_notification(StatusUpdate {
1273    ///     message: "Processing...".into(),
1274    /// })?;
1275    /// # Ok(())
1276    /// # }
1277    /// ```
1278    pub fn send_notification<N: JrNotification>(
1279        &self,
1280        notification: N,
1281    ) -> Result<(), crate::Error> {
1282        let untyped = notification.to_untyped_message()?;
1283        let params = crate::util::json_cast(untyped.params).ok();
1284        self.send_raw_message(OutgoingMessage::Notification {
1285            method: untyped.method,
1286            params,
1287        })
1288    }
1289
1290    /// Send an error notification (no reply expected).
1291    pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
1292        self.send_raw_message(OutgoingMessage::Error { error })
1293    }
1294
1295    fn send_raw_message(&self, message: OutgoingMessage) -> Result<(), crate::Error> {
1296        match &message {
1297            OutgoingMessage::Response { id, response } => match response {
1298                Ok(_) => tracing::debug!(?id, "send_raw_message: queuing success response"),
1299                Err(e) => tracing::warn!(?id, ?e, "send_raw_message: queuing error response"),
1300            },
1301            _ => {}
1302        }
1303        self.message_tx
1304            .unbounded_send(message)
1305            .map_err(communication_failure)
1306    }
1307}
1308
1309/// The context to respond to an incoming request.
1310///
1311/// This context is provided to request handlers and serves a dual role:
1312///
1313/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
1314///    [`respond_with_result`](Self::respond_with_result) to send the response
1315/// 2. **Send other messages** - Use [`connection_cx`](Self::connection_cx) to access the
1316///    underlying [`JrConnectionCx`], giving access to
1317///    [`send_request`](JrConnectionCx::send_request),
1318///    [`send_notification`](JrConnectionCx::send_notification), and
1319///    [`spawn`](JrConnectionCx::spawn)
1320///
1321/// # Example
1322///
1323/// ```no_run
1324/// # use sacp_test::*;
1325/// # async fn example() -> Result<(), sacp::Error> {
1326/// # let connection = mock_connection();
1327/// connection.on_receive_request(async |req: ProcessRequest, cx| {
1328///     // Send a notification while processing
1329///     cx.connection_cx().send_notification(StatusUpdate {
1330///         message: "processing".into(),
1331///     })?;
1332///
1333///     // Do some work...
1334///     let result = process(&req.data)?;
1335///
1336///     // Respond to the request
1337///     cx.respond(ProcessResponse { result })
1338/// })
1339/// # .serve(sacp_test::MockTransport).await?;
1340/// # Ok(())
1341/// # }
1342/// ```
1343///
1344/// # Event Loop Considerations
1345///
1346/// Like all handlers, request handlers run on the event loop. Use
1347/// [`spawn`](JrConnectionCx::spawn) for expensive operations to avoid blocking
1348/// the connection.
1349///
1350/// See the [Event Loop and Concurrency](JrConnection#event-loop-and-concurrency)
1351/// section for more details.
1352#[must_use]
1353pub struct JrRequestCx<T: JrResponsePayload> {
1354    /// The context to use to send outgoing messages and replies.
1355    cx: JrConnectionCx,
1356
1357    /// The method of the request.
1358    method: String,
1359
1360    /// The `id` of the message we are replying to.
1361    id: jsonrpcmsg::Id,
1362
1363    /// Function to send the response `T` to a request with the given method and id.
1364    make_json: SendBoxFnOnce<
1365        'static,
1366        (String, Result<T, crate::Error>),
1367        Result<serde_json::Value, crate::Error>,
1368    >,
1369}
1370
1371impl<T: JrResponsePayload> std::fmt::Debug for JrRequestCx<T> {
1372    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1373        f.debug_struct("JrRequestCx")
1374            .field("cx", &self.cx)
1375            .field("method", &self.method)
1376            .field("id", &self.id)
1377            .field("response_type", &std::any::type_name::<T>())
1378            .finish()
1379    }
1380}
1381
1382impl JrRequestCx<serde_json::Value> {
1383    /// Create a new method context.
1384    fn new(cx: &JrConnectionCx, method: String, id: jsonrpcmsg::Id) -> Self {
1385        Self {
1386            cx: cx.clone(),
1387            method,
1388            id,
1389            make_json: SendBoxFnOnce::new(move |_method, value| value),
1390        }
1391    }
1392
1393    /// Cast this request context to a different response type
1394    pub fn cast<T: JrResponsePayload>(self) -> JrRequestCx<T> {
1395        self.wrap_params(move |method, value| match value {
1396            Ok(value) => T::into_json(value, &method),
1397            Err(e) => Err(e),
1398        })
1399    }
1400}
1401
1402impl<T: JrResponsePayload> JrRequestCx<T> {
1403    /// Method of the incoming request
1404    pub fn method(&self) -> &str {
1405        &self.method
1406    }
1407
1408    /// ID of the incoming request as a JSON value
1409    pub fn id(&self) -> serde_json::Value {
1410        match &self.id {
1411            jsonrpcmsg::Id::Number(n) => serde_json::Value::Number((*n).into()),
1412            jsonrpcmsg::Id::String(s) => serde_json::Value::String(s.clone()),
1413            jsonrpcmsg::Id::Null => serde_json::Value::Null,
1414        }
1415    }
1416
1417    /// Convert to a `JrRequestCx` that expects a JSON value
1418    /// and which checks (dynamically) that the JSON value it receives
1419    /// can be converted to `T`.
1420    pub fn erase_to_json(self) -> JrRequestCx<serde_json::Value> {
1421        self.wrap_params(|method, value| T::from_value(&method, value?))
1422    }
1423
1424    /// Return a new JrResponse that expects a response of type U and serializes it.
1425    pub fn wrap_method(self, method: String) -> JrRequestCx<T> {
1426        JrRequestCx {
1427            cx: self.cx,
1428            method,
1429            id: self.id,
1430            make_json: self.make_json,
1431        }
1432    }
1433
1434    /// Return a new JrResponse that expects a response of type U and serializes it.
1435    ///
1436    /// `wrap_fn` will be invoked with the method name and the result of the wrapped function.
1437    pub fn wrap_params<U: JrResponsePayload>(
1438        self,
1439        wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
1440    ) -> JrRequestCx<U> {
1441        JrRequestCx {
1442            cx: self.cx,
1443            method: self.method,
1444            id: self.id,
1445            make_json: SendBoxFnOnce::new(move |method: String, input: Result<U, crate::Error>| {
1446                let t_value = wrap_fn(&method, input);
1447                self.make_json.call(method, t_value)
1448            }),
1449        }
1450    }
1451
1452    /// Get the underlying JSON RPC context.
1453    pub fn connection_cx(&self) -> JrConnectionCx {
1454        self.cx.clone()
1455    }
1456
1457    /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
1458    pub fn respond_with_result(
1459        self,
1460        response: Result<T, crate::Error>,
1461    ) -> Result<(), crate::Error> {
1462        tracing::debug!(id = ?self.id, "respond called");
1463        let json = self.make_json.call_tuple((self.method.clone(), response));
1464        self.cx.send_raw_message(OutgoingMessage::Response {
1465            id: self.id,
1466            response: json,
1467        })
1468    }
1469
1470    /// Respond to the JSON-RPC request with a value.
1471    pub fn respond(self, response: T) -> Result<(), crate::Error> {
1472        self.respond_with_result(Ok(response))
1473    }
1474
1475    /// Respond to the JSON-RPC request with an internal error containing a message.
1476    pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
1477        self.respond_with_error(crate::util::internal_error(message))
1478    }
1479
1480    /// Respond to the JSON-RPC request with an error.
1481    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
1482        tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
1483        self.respond_with_result(Err(error))
1484    }
1485}
1486
1487/// Common bounds for any JSON-RPC message.
1488pub trait JrMessage: 'static + Debug + Sized + Send + Clone {
1489    /// The parameters for the request.
1490    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
1491
1492    /// The method name for the request.
1493    fn method(&self) -> &str;
1494
1495    /// Attempt to parse this type from a JSON-RPC request.
1496    ///
1497    /// Returns:
1498    /// - `None` if this type does not recognize the method name or recognizes it as a notification
1499    /// - `Some(Ok(value))` if the method is recognized as a request and deserialization succeeds
1500    /// - `Some(Err(error))` if the method is recognized as a request but deserialization fails
1501    fn parse_request(_method: &str, _params: &impl Serialize)
1502    -> Option<Result<Self, crate::Error>>;
1503
1504    /// Attempt to parse this type from a JSON-RPC notification.
1505    ///
1506    /// Returns:
1507    /// - `None` if this type does not recognize the method name or recognizes it as a request
1508    /// - `Some(Ok(value))` if the method is recognized as a notification and deserialization succeeds
1509    /// - `Some(Err(error))` if the method is recognized as a notification but deserialization fails
1510    fn parse_notification(
1511        _method: &str,
1512        _params: &impl Serialize,
1513    ) -> Option<Result<Self, crate::Error>>;
1514}
1515
1516/// Defines the "payload" of a successful response to a JSON-RPC request.
1517pub trait JrResponsePayload: 'static + Debug + Sized + Send {
1518    /// Convert this message into a JSON value.
1519    fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
1520
1521    /// Parse a JSON value into the response type.
1522    fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
1523}
1524
1525impl JrResponsePayload for serde_json::Value {
1526    fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
1527        Ok(value)
1528    }
1529
1530    fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
1531        Ok(self)
1532    }
1533}
1534
1535/// A struct that represents a notification (JSON-RPC message that does not expect a response).
1536pub trait JrNotification: JrMessage {}
1537
1538/// A struct that represents a request (JSON-RPC message expecting a response).
1539pub trait JrRequest: JrMessage {
1540    /// The type of data expected in response.
1541    type Response: JrResponsePayload;
1542}
1543
1544/// An enum capturing an in-flight request or notification.
1545/// In the case of a request, also includes the context used to respond to the request.
1546///
1547/// Type parameters allow specifying the concrete request and notification types.
1548/// By default, both are `UntypedMessage` for dynamic dispatch.
1549/// The request context's response type matches the request's response type.
1550#[derive(Debug)]
1551pub enum MessageAndCx<R: JrRequest = UntypedMessage, N: JrMessage = UntypedMessage> {
1552    /// Incoming request and the context where the response should be sent.
1553    Request(R, JrRequestCx<R::Response>),
1554
1555    /// Incoming notification.
1556    Notification(N, JrConnectionCx),
1557}
1558
1559impl<R: JrRequest, N: JrMessage> MessageAndCx<R, N> {
1560    /// Map the request and notification types to new types.
1561    pub fn map<R1, N1>(
1562        self,
1563        map_request: impl FnOnce(R, JrRequestCx<R::Response>) -> (R1, JrRequestCx<R1::Response>),
1564        map_notification: impl FnOnce(N, JrConnectionCx) -> (N1, JrConnectionCx),
1565    ) -> MessageAndCx<R1, N1>
1566    where
1567        R1: JrRequest<Response: Send>,
1568        N1: JrMessage,
1569    {
1570        match self {
1571            MessageAndCx::Request(request, cx) => {
1572                let (new_request, new_cx) = map_request(request, cx);
1573                MessageAndCx::Request(new_request, new_cx)
1574            }
1575            MessageAndCx::Notification(notification, cx) => {
1576                let (new_notification, new_cx) = map_notification(notification, cx);
1577                MessageAndCx::Notification(new_notification, new_cx)
1578            }
1579        }
1580    }
1581
1582    /// Respond to the message with an error.
1583    ///
1584    /// If this message is a request, this error becomes the reply to the request.
1585    ///
1586    /// If this message is a notification, the error is sent as a notification.
1587    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
1588        match self {
1589            MessageAndCx::Request(_, cx) => cx.respond_with_error(error),
1590            MessageAndCx::Notification(_, cx) => cx.send_error_notification(error),
1591        }
1592    }
1593
1594    /// Convert to a `JrRequestCx` that expects a JSON value
1595    /// and which checks (dynamically) that the JSON value it receives
1596    /// can be converted to `T`.
1597    pub fn erase_to_json(self) -> Result<MessageAndCx, crate::Error> {
1598        match self {
1599            MessageAndCx::Request(response, request_cx) => Ok(MessageAndCx::Request(
1600                response.to_untyped_message()?,
1601                request_cx.erase_to_json(),
1602            )),
1603            MessageAndCx::Notification(notification, cx) => Ok(MessageAndCx::Notification(
1604                notification.to_untyped_message()?,
1605                cx,
1606            )),
1607        }
1608    }
1609
1610    /// Convert the message to an untyped message.
1611    pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
1612        match self {
1613            MessageAndCx::Request(request, _) => request.to_untyped_message(),
1614            MessageAndCx::Notification(notification, _) => notification.to_untyped_message(),
1615        }
1616    }
1617
1618    /// Returns the request ID if this is a request, None if notification.
1619    pub fn id(&self) -> Option<serde_json::Value> {
1620        match self {
1621            MessageAndCx::Request(_, cx) => Some(cx.id()),
1622            MessageAndCx::Notification(_, _) => None,
1623        }
1624    }
1625}
1626
1627impl MessageAndCx {
1628    /// Returns the method of the message (only available for UntypedMessage).
1629    pub fn method(&self) -> &str {
1630        match self {
1631            MessageAndCx::Request(msg, _) => &msg.method,
1632            MessageAndCx::Notification(msg, _) => &msg.method,
1633        }
1634    }
1635
1636    /// Returns the message of the message (only available for UntypedMessage).
1637    pub fn message(&self) -> &UntypedMessage {
1638        match self {
1639            MessageAndCx::Request(msg, _) => msg,
1640            MessageAndCx::Notification(msg, _) => msg,
1641        }
1642    }
1643}
1644
1645/// An incoming JSON message without any typing. Can be a request or a notification.
1646#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1647pub struct UntypedMessage {
1648    /// The JSON-RPC method name
1649    pub method: String,
1650    /// The JSON-RPC parameters as a raw JSON value
1651    pub params: serde_json::Value,
1652}
1653
1654impl UntypedMessage {
1655    /// Returns an untyped message with the given method and parameters.
1656    pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
1657        let params = serde_json::to_value(params)?;
1658        Ok(Self {
1659            method: method.to_string(),
1660            params,
1661        })
1662    }
1663
1664    /// Returns the method name
1665    pub fn method(&self) -> &str {
1666        &self.method
1667    }
1668
1669    /// Returns the parameters as a JSON value
1670    pub fn params(&self) -> &serde_json::Value {
1671        &self.params
1672    }
1673
1674    /// Consumes this message and returns the method and params
1675    pub fn into_parts(self) -> (String, serde_json::Value) {
1676        (self.method, self.params)
1677    }
1678}
1679
1680impl JrMessage for UntypedMessage {
1681    fn method(&self) -> &str {
1682        &self.method
1683    }
1684
1685    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
1686        Ok(self.clone())
1687    }
1688
1689    fn parse_request(method: &str, params: &impl Serialize) -> Option<Result<Self, crate::Error>> {
1690        Some(UntypedMessage::new(method, params))
1691    }
1692
1693    fn parse_notification(
1694        method: &str,
1695        params: &impl Serialize,
1696    ) -> Option<Result<Self, crate::Error>> {
1697        Some(UntypedMessage::new(method, params))
1698    }
1699}
1700
1701impl JrRequest for UntypedMessage {
1702    type Response = serde_json::Value;
1703}
1704
1705impl JrNotification for UntypedMessage {}
1706
1707/// Represents a pending response of type `R` from an outgoing request.
1708///
1709/// Returned by [`JrConnectionCx::send_request`], this type provides methods for handling
1710/// the response without blocking the event loop. The API is intentionally designed to make
1711/// it difficult to accidentally block.
1712///
1713/// # Anti-Footgun Design
1714///
1715/// You cannot directly `.await` a `JrResponse`. Instead, you must choose how to handle
1716/// the response:
1717///
1718/// ## Option 1: Schedule a Callback (Safe in Handlers)
1719///
1720/// Use [`await_when_result_received`](Self::await_when_result_received) to schedule a task
1721/// that runs when the response arrives. This doesn't block the event loop:
1722///
1723/// ```no_run
1724/// # use sacp_test::*;
1725/// # async fn example(cx: sacp::JrConnectionCx) -> Result<(), sacp::Error> {
1726/// cx.send_request(MyRequest {})
1727///     .await_when_result_received(async |result| {
1728///         match result {
1729///             Ok(response) => {
1730///                 // Handle successful response
1731///                 Ok(())
1732///             }
1733///             Err(error) => {
1734///                 // Handle error
1735///                 Err(error)
1736///             }
1737///         }
1738///     })?;
1739/// # Ok(())
1740/// # }
1741/// ```
1742///
1743/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
1744///
1745/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
1746/// in a spawned task (never in a handler):
1747///
1748/// ```no_run
1749/// # use sacp_test::*;
1750/// # async fn example(cx: sacp::JrConnectionCx) -> Result<(), sacp::Error> {
1751/// // ✅ Safe: Spawned task runs concurrently
1752/// cx.spawn({
1753///     let cx = cx.clone();
1754///     async move {
1755///         let response = cx.send_request(MyRequest {})
1756///             .block_task()
1757///             .await?;
1758///         // Process response...
1759///         Ok(())
1760///     }
1761/// })?;
1762/// # Ok(())
1763/// # }
1764/// ```
1765///
1766/// ```no_run
1767/// # use sacp_test::*;
1768/// # async fn example() -> Result<(), sacp::Error> {
1769/// # let connection = mock_connection();
1770/// // ❌ NEVER do this in a handler - blocks the event loop!
1771/// connection.on_receive_request(async |req: MyRequest, cx| {
1772///     let response = cx.connection_cx().send_request(MyRequest {})
1773///         .block_task()  // This will deadlock!
1774///         .await?;
1775///     cx.respond(response)
1776/// })
1777/// # .serve(sacp_test::MockTransport).await?;
1778/// # Ok(())
1779/// # }
1780/// ```
1781///
1782/// # Why This Design?
1783///
1784/// If you block the event loop while waiting for a response, the connection cannot process
1785/// the incoming response message, creating a deadlock. This API design prevents that footgun
1786/// by making blocking explicit and encouraging non-blocking patterns.
1787pub struct JrResponse<R> {
1788    method: String,
1789    connection_cx: JrConnectionCx,
1790    response_rx: oneshot::Receiver<Result<serde_json::Value, crate::Error>>,
1791    to_result: Box<dyn Fn(serde_json::Value) -> Result<R, crate::Error> + Send>,
1792}
1793
1794impl JrResponse<serde_json::Value> {
1795    fn new(
1796        method: String,
1797        connection_cx: JrConnectionCx,
1798        response_rx: oneshot::Receiver<Result<serde_json::Value, crate::Error>>,
1799    ) -> Self {
1800        Self {
1801            method,
1802            response_rx,
1803            connection_cx,
1804            to_result: Box::new(Ok),
1805        }
1806    }
1807}
1808
1809impl<R: JrResponsePayload> JrResponse<R> {
1810    /// The method of the request this is in response to.
1811    pub fn method(&self) -> &str {
1812        &self.method
1813    }
1814
1815    /// Create a new response that maps the result of the response to a new type.
1816    pub fn map<U>(
1817        self,
1818        map_fn: impl Fn(R) -> Result<U, crate::Error> + 'static + Send,
1819    ) -> JrResponse<U>
1820    where
1821        U: JrResponsePayload,
1822    {
1823        JrResponse {
1824            method: self.method,
1825            response_rx: self.response_rx,
1826            connection_cx: self.connection_cx,
1827            to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
1828        }
1829    }
1830
1831    /// Forward the response (success or error) to a request context when it arrives.
1832    ///
1833    /// This is a convenience method for proxying messages between connections. When the
1834    /// response arrives, it will be automatically sent to the provided request context,
1835    /// whether it's a successful response or an error.
1836    ///
1837    /// # Example: Proxying requests
1838    ///
1839    /// ```
1840    /// # use sacp::{JrHandlerChain, JrConnectionCx};
1841    /// # use sacp_test::*;
1842    /// # async fn example(cx: JrConnectionCx) -> Result<(), sacp::Error> {
1843    /// // Set up backend connection
1844    /// let backend = JrHandlerChain::new()
1845    ///     .on_receive_request(async |req: MyRequest, request_cx| {
1846    ///         request_cx.respond(MyResponse { status: "ok".into() })
1847    ///     })
1848    ///     .connect_to(MockTransport)?;
1849    ///
1850    /// // Spawn backend and get a context to send to it
1851    /// let backend_cx = cx.spawn_connection(backend, |c| Box::pin(c.serve()))?;
1852    ///
1853    /// // Set up proxy that forwards requests to backend
1854    /// JrHandlerChain::new()
1855    ///     .on_receive_request({
1856    ///         let backend_cx = backend_cx.clone();
1857    ///         async move |req: MyRequest, request_cx| {
1858    ///             // Forward the request to backend and proxy the response back
1859    ///             backend_cx.send_request(req)
1860    ///                 .forward_to_request_cx(request_cx)?;
1861    ///             Ok(())
1862    ///         }
1863    ///     });
1864    /// # Ok(())
1865    /// # }
1866    /// ```
1867    ///
1868    /// # Type Safety
1869    ///
1870    /// The request context's response type must match the request's response type,
1871    /// ensuring type-safe message forwarding.
1872    ///
1873    /// # When to Use
1874    ///
1875    /// Use this when:
1876    /// - You're implementing a proxy or gateway pattern
1877    /// - You want to forward responses without processing them
1878    /// - The response types match between the outgoing request and incoming request
1879    ///
1880    /// This is equivalent to calling `await_when_result_received` and manually forwarding
1881    /// the result, but more concise.
1882    pub fn forward_to_request_cx(self, request_cx: JrRequestCx<R>) -> Result<(), crate::Error>
1883    where
1884        R: Send,
1885    {
1886        self.await_when_result_received(async move |result| request_cx.respond_with_result(result))
1887    }
1888
1889    /// Block the current task until the response is received.
1890    ///
1891    /// **Warning:** This method blocks the current async task. It is **only safe** to use
1892    /// in spawned tasks created with [`JrConnectionCx::spawn`]. Using it directly in a
1893    /// handler callback will deadlock the connection.
1894    ///
1895    /// # Safe Usage (in spawned tasks)
1896    ///
1897    /// ```no_run
1898    /// # use sacp_test::*;
1899    /// # async fn example() -> Result<(), sacp::Error> {
1900    /// # let connection = mock_connection();
1901    /// connection.on_receive_request(async |req: MyRequest, cx| {
1902    ///     // Spawn a task to handle the request
1903    ///     cx.connection_cx().spawn({
1904    ///         let connection_cx = cx.connection_cx();
1905    ///         async move {
1906    ///             // Safe: We're in a spawned task, not blocking the event loop
1907    ///             let response = connection_cx.send_request(OtherRequest {})
1908    ///                 .block_task()
1909    ///                 .await?;
1910    ///
1911    ///             // Process the response...
1912    ///             Ok(())
1913    ///         }
1914    ///     })?;
1915    ///
1916    ///     // Respond immediately
1917    ///     cx.respond(MyResponse { status: "ok".into() })
1918    /// })
1919    /// # .serve(sacp_test::MockTransport).await?;
1920    /// # Ok(())
1921    /// # }
1922    /// ```
1923    ///
1924    /// # Unsafe Usage (in handlers - will deadlock!)
1925    ///
1926    /// ```no_run
1927    /// # use sacp_test::*;
1928    /// # async fn example() -> Result<(), sacp::Error> {
1929    /// # let connection = mock_connection();
1930    /// connection.on_receive_request(async |req: MyRequest, cx| {
1931    ///     // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
1932    ///     let response = cx.connection_cx().send_request(OtherRequest {})
1933    ///         .block_task()
1934    ///         .await?;
1935    ///
1936    ///     cx.respond(MyResponse { status: response.value })
1937    /// })
1938    /// # .serve(sacp_test::MockTransport).await?;
1939    /// # Ok(())
1940    /// # }
1941    /// ```
1942    ///
1943    /// # When to Use
1944    ///
1945    /// Use this method when:
1946    /// - You're in a spawned task (via [`JrConnectionCx::spawn`])
1947    /// - You need the response value to proceed with your logic
1948    /// - Linear control flow is more natural than callbacks
1949    ///
1950    /// For handler callbacks, use [`await_when_result_received`](Self::await_when_result_received) instead.
1951    pub async fn block_task(self) -> Result<R, crate::Error>
1952    where
1953        R: Send,
1954    {
1955        match self.response_rx.await {
1956            Ok(Ok(json_value)) => match (self.to_result)(json_value) {
1957                Ok(value) => Ok(value),
1958                Err(err) => Err(err),
1959            },
1960            Ok(Err(err)) => Err(err),
1961            Err(err) => Err(crate::util::internal_error(format!(
1962                "response to `{}` never received: {}",
1963                self.method, err
1964            ))),
1965        }
1966    }
1967
1968    /// Schedule an async task to run when a successful response is received.
1969    ///
1970    /// This is a convenience wrapper around [`await_when_result_received`](Self::await_when_result_received)
1971    /// for the common pattern of forwarding errors to a request context while only processing
1972    /// successful responses.
1973    ///
1974    /// # Behavior
1975    ///
1976    /// - If the response is `Ok(value)`, your task receives the value and the request context
1977    /// - If the response is `Err(error)`, the error is automatically sent to `request_cx`
1978    ///   and your task is not called
1979    ///
1980    /// # Example: Chaining requests
1981    ///
1982    /// ```no_run
1983    /// # use sacp_test::*;
1984    /// # async fn example() -> Result<(), sacp::Error> {
1985    /// # let connection = mock_connection();
1986    /// connection.on_receive_request(async |req: ValidateRequest, request_cx| {
1987    ///     // Send initial request
1988    ///     request_cx.connection_cx().send_request(ValidateRequest { data: req.data.clone() })
1989    ///         .await_when_ok_response_received(request_cx, async |validation, request_cx| {
1990    ///             // Only runs if validation succeeded
1991    ///             if validation.is_valid {
1992    ///                 // Respond to original request
1993    ///                 request_cx.respond(ValidateResponse { is_valid: true, error: None })
1994    ///             } else {
1995    ///                 request_cx.respond_with_error(sacp::util::internal_error("validation failed"))
1996    ///             }
1997    ///         })?;
1998    ///
1999    ///     Ok(())
2000    /// })
2001    /// # .serve(sacp_test::MockTransport).await?;
2002    /// # Ok(())
2003    /// # }
2004    /// ```
2005    ///
2006    /// # When to Use
2007    ///
2008    /// Use this when:
2009    /// - You need to respond to a request based on another request's result
2010    /// - You want errors to automatically propagate to the request context
2011    /// - You only care about the success case
2012    ///
2013    /// For more control over error handling, use [`await_when_result_received`](Self::await_when_result_received).
2014    #[track_caller]
2015    pub fn await_when_ok_response_received<F>(
2016        self,
2017        request_cx: JrRequestCx<R>,
2018        task: impl FnOnce(R, JrRequestCx<R>) -> F + 'static + Send,
2019    ) -> Result<(), crate::Error>
2020    where
2021        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
2022        R: Send,
2023    {
2024        self.await_when_result_received(async move |result| match result {
2025            Ok(value) => task(value, request_cx).await,
2026            Err(err) => request_cx.respond_with_error(err),
2027        })
2028    }
2029
2030    /// Schedule an async task to run when the response is received.
2031    ///
2032    /// This is the recommended way to handle responses in handler callbacks, as it doesn't
2033    /// block the event loop. The task will be spawned automatically when the response arrives.
2034    ///
2035    /// # Example: Handle response in callback
2036    ///
2037    /// ```no_run
2038    /// # use sacp_test::*;
2039    /// # async fn example() -> Result<(), sacp::Error> {
2040    /// # let connection = mock_connection();
2041    /// connection.on_receive_request(async |req: MyRequest, cx| {
2042    ///     // Send a request and schedule a callback for the response
2043    ///     cx.connection_cx().send_request(QueryRequest { id: 22 })
2044    ///         .await_when_result_received({
2045    ///             let connection_cx = cx.connection_cx();
2046    ///             async move |result| {
2047    ///                 match result {
2048    ///                     Ok(response) => {
2049    ///                         println!("Got response: {:?}", response);
2050    ///                         // Can send more messages here
2051    ///                         connection_cx.send_notification(QueryComplete {})?;
2052    ///                         Ok(())
2053    ///                 }
2054    ///                     Err(error) => {
2055    ///                         eprintln!("Request failed: {}", error);
2056    ///                         Err(error)
2057    ///                     }
2058    ///                 }
2059    ///             }
2060    ///         })?;
2061    ///
2062    ///     // Handler continues immediately without waiting
2063    ///     cx.respond(MyResponse { status: "processing".into() })
2064    /// })
2065    /// # .serve(sacp_test::MockTransport).await?;
2066    /// # Ok(())
2067    /// # }
2068    /// ```
2069    ///
2070    /// # Event Loop Safety
2071    ///
2072    /// Unlike [`block_task`](Self::block_task), this method is safe to use in handlers because
2073    /// it schedules the task to run later rather than blocking the current task. The event loop
2074    /// remains free to process messages, including the response itself.
2075    ///
2076    /// # Error Handling
2077    ///
2078    /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
2079    /// errors appropriately within your task.
2080    ///
2081    /// # When to Use
2082    ///
2083    /// Use this method when:
2084    /// - You're in a handler callback (not a spawned task)
2085    /// - You want to process the response asynchronously
2086    /// - You don't need the response value immediately
2087    ///
2088    /// For spawned tasks where you need linear control flow, consider [`block_task`](Self::block_task).
2089    #[track_caller]
2090    pub fn await_when_result_received<F>(
2091        self,
2092        task: impl FnOnce(Result<R, crate::Error>) -> F + 'static + Send,
2093    ) -> Result<(), crate::Error>
2094    where
2095        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
2096        R: Send,
2097    {
2098        let connection_cx = self.connection_cx.clone();
2099        let block_task = self.block_task();
2100        connection_cx.spawn(async move { task(block_task.await).await })
2101    }
2102}
2103
2104const COMMUNICATION_FAILURE: i32 = -32000;
2105
2106fn communication_failure(err: impl ToString) -> crate::Error {
2107    crate::Error::new((COMMUNICATION_FAILURE, err.to_string()))
2108}
2109
2110// ============================================================================
2111// IntoJrConnectionTransport Implementations
2112// ============================================================================
2113
2114/// A component that communicates over line streams.
2115///
2116/// `Lines` implements the [`Component`] trait for any pair of line-based streams
2117/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
2118/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
2119///
2120/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
2121/// transformation of individual lines before they are parsed or after they are serialized.
2122/// This is particularly useful for debugging, logging, or implementing custom line-based
2123/// protocols.
2124///
2125/// # Use Cases
2126///
2127/// - **Line-by-line logging**: Intercept and log each line before parsing
2128/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
2129/// - **Debugging**: Inspect raw message strings
2130/// - **Line filtering**: Skip or modify specific messages
2131///
2132/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
2133/// for byte-based I/O.
2134///
2135/// [`Component`]: crate::Component
2136pub struct Lines<OutgoingSink, IncomingStream> {
2137    /// Outgoing line sink (where we write serialized JSON-RPC messages)
2138    pub outgoing: OutgoingSink,
2139    /// Incoming line stream (where we read and parse JSON-RPC messages)
2140    pub incoming: IncomingStream,
2141}
2142
2143impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
2144where
2145    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
2146    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
2147{
2148    /// Create a new line stream transport.
2149    pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
2150        Self { outgoing, incoming }
2151    }
2152}
2153
2154impl<OutgoingSink, IncomingStream> Component for Lines<OutgoingSink, IncomingStream>
2155where
2156    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
2157    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
2158{
2159    async fn serve(self, client: impl Component) -> Result<(), crate::Error> {
2160        let (channel, serve_self) = self.into_server();
2161        match futures::future::select(Box::pin(client.serve(channel)), serve_self).await {
2162            Either::Left((result, _)) => result,
2163            Either::Right((result, _)) => result,
2164        }
2165    }
2166
2167    fn into_server(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
2168        let Self { outgoing, incoming } = self;
2169
2170        // Create a channel pair for the client to use
2171        let (channel_for_caller, channel_for_lines) = Channel::duplex();
2172
2173        // Create the server future that runs the line stream actors
2174        let server_future = Box::pin(async move {
2175            let Channel { rx, tx } = channel_for_lines;
2176
2177            // Run both actors concurrently
2178            let outgoing_future = actors::transport_outgoing_lines_actor(rx, outgoing);
2179            let incoming_future = actors::transport_incoming_lines_actor(incoming, tx);
2180
2181            // Wait for both to complete
2182            futures::try_join!(outgoing_future, incoming_future)?;
2183
2184            Ok(())
2185        });
2186
2187        (channel_for_caller, server_future)
2188    }
2189}
2190
2191/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
2192///
2193/// `ByteStreams` implements the [`Component`] trait for any pair of `AsyncRead` and `AsyncWrite`
2194/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
2195/// This is the standard way to communicate with external processes or network connections.
2196///
2197/// # Use Cases
2198///
2199/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
2200/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
2201/// - **Named pipes**: Cross-process communication on the same machine
2202/// - **File I/O**: Reading from and writing to file descriptors
2203///
2204/// # Example
2205///
2206/// Connecting to an agent via stdio:
2207///
2208/// ```no_run
2209/// use sacp::ByteStreams;
2210/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
2211///
2212/// # async fn example() -> Result<(), sacp::Error> {
2213/// let component = ByteStreams::new(
2214///     tokio::io::stdout().compat_write(),
2215///     tokio::io::stdin().compat(),
2216/// );
2217///
2218/// // Use as a component in a handler chain
2219/// sacp::JrHandlerChain::new()
2220///     .name("my-client")
2221///     .serve(component)
2222///     .await?;
2223/// # Ok(())
2224/// # }
2225/// ```
2226///
2227/// [`Component`]: crate::Component
2228pub struct ByteStreams<OB, IB> {
2229    /// Outgoing byte stream (where we write serialized messages)
2230    pub outgoing: OB,
2231    /// Incoming byte stream (where we read and parse messages)
2232    pub incoming: IB,
2233}
2234
2235impl<OB, IB> ByteStreams<OB, IB>
2236where
2237    OB: AsyncWrite + Send + 'static,
2238    IB: AsyncRead + Send + 'static,
2239{
2240    /// Create a new byte stream transport.
2241    pub fn new(outgoing: OB, incoming: IB) -> Self {
2242        Self { outgoing, incoming }
2243    }
2244}
2245
2246impl<OB, IB> Component for ByteStreams<OB, IB>
2247where
2248    OB: AsyncWrite + Send + 'static,
2249    IB: AsyncRead + Send + 'static,
2250{
2251    async fn serve(self, client: impl Component) -> Result<(), crate::Error> {
2252        let (channel, serve_self) = self.into_server();
2253        match futures::future::select(pin!(client.serve(channel)), serve_self).await {
2254            Either::Left((result, _)) => result,
2255            Either::Right((result, _)) => result,
2256        }
2257    }
2258
2259    fn into_server(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
2260        use futures::AsyncBufReadExt;
2261        use futures::AsyncWriteExt;
2262        use futures::io::BufReader;
2263        let Self { outgoing, incoming } = self;
2264
2265        // Convert byte streams to line streams
2266        // Box both streams to satisfy Unpin requirements
2267        let incoming_lines = Box::pin(BufReader::new(incoming).lines());
2268
2269        // Create a sink that writes lines (with newlines) to the outgoing byte stream
2270        // We need to Box the writer since it may not be Unpin
2271        let outgoing_sink =
2272            futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
2273                let mut bytes = line.into_bytes();
2274                bytes.push(b'\n');
2275                writer.write_all(&bytes).await?;
2276                Ok::<_, std::io::Error>(writer)
2277            });
2278
2279        // Delegate to Lines component
2280        Lines::new(outgoing_sink, incoming_lines).into_server()
2281    }
2282}
2283
2284/// A channel endpoint representing one side of a bidirectional message channel.
2285///
2286/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
2287/// Each endpoint has:
2288/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
2289/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
2290///
2291/// # Example
2292///
2293/// ```no_run
2294/// # use sacp::{Channel, JrHandlerChain};
2295/// # async fn example() -> Result<(), sacp::Error> {
2296/// // Create a pair of connected channels
2297/// let (channel_a, channel_b) = Channel::duplex();
2298///
2299/// // Each channel can be used by a different component
2300/// JrHandlerChain::new()
2301///     .name("connection-a")
2302///     .serve(channel_a)
2303///     .await?;
2304/// # Ok(())
2305/// # }
2306/// ```
2307pub struct Channel {
2308    /// Receives messages (or errors) from the counterpart.
2309    pub rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
2310    /// Sends messages (or errors) to the counterpart.
2311    pub tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
2312}
2313
2314impl Channel {
2315    /// Create a pair of connected channel endpoints.
2316    ///
2317    /// Returns two `Channel` instances that are connected to each other:
2318    /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
2319    /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
2320    ///
2321    /// # Returns
2322    ///
2323    /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
2324    pub fn duplex() -> (Self, Self) {
2325        // Create channels: A sends Result<Message> which B receives as Message
2326        let (a_tx, b_rx) = mpsc::unbounded();
2327        let (b_tx, a_rx) = mpsc::unbounded();
2328
2329        let channel_a = Self { rx: a_rx, tx: a_tx };
2330        let channel_b = Self { rx: b_rx, tx: b_tx };
2331
2332        (channel_a, channel_b)
2333    }
2334
2335    /// Copy messages from `rx` to `tx`.
2336    ///
2337    /// # Returns
2338    ///
2339    /// A `Result` indicating success or failure.
2340    pub async fn copy(mut self) -> Result<(), crate::Error> {
2341        while let Some(msg) = self.rx.next().await {
2342            self.tx
2343                .unbounded_send(msg)
2344                .map_err(crate::util::internal_error)?;
2345        }
2346        Ok(())
2347    }
2348}
2349
2350impl Component for Channel {
2351    async fn serve(self, client: impl Component) -> Result<(), crate::Error> {
2352        let (client_channel, client_serve) = client.into_server();
2353
2354        match futures::try_join!(
2355            Channel {
2356                rx: client_channel.rx,
2357                tx: self.tx
2358            }
2359            .copy(),
2360            Channel {
2361                rx: self.rx,
2362                tx: client_channel.tx
2363            }
2364            .copy(),
2365            client_serve
2366        ) {
2367            Ok(((), (), ())) => Ok(()),
2368            Err(err) => Err(err),
2369        }
2370    }
2371
2372    fn into_server(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
2373        (self, Box::pin(future::ready(Ok(()))))
2374    }
2375}