sacp/jsonrpc.rs
1//! Core JSON-RPC server support.
2
3// Re-export jsonrpcmsg for use in public API
4pub use jsonrpcmsg;
5
6// Types re-exported from crate root
7use serde::{Deserialize, Serialize};
8use std::fmt::Debug;
9use std::panic::Location;
10use std::pin::pin;
11
12use boxfnonce::SendBoxFnOnce;
13use futures::channel::{mpsc, oneshot};
14use futures::future::{self, BoxFuture, Either};
15use futures::{AsyncRead, AsyncWrite, FutureExt, StreamExt};
16
17mod actors;
18pub(crate) mod handlers;
19mod task_actor;
20
21use crate::Component;
22use crate::handler::{ChainedHandler, NamedHandler};
23use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, NullHandler, RequestHandler};
24use crate::jsonrpc::task_actor::{PendingTask, Task};
25
26/// Handlers are invoked when new messages arrive at the [`JrConnection`].
27/// They have a chance to inspect the method and parameters and decide whether to "claim" the request
28/// (i.e., handle it). If they do not claim it, the request will be passed to the next handler.
29#[allow(async_fn_in_trait)]
30pub trait JrMessageHandler {
31 /// Attempt to claim an incoming message (request or notification).
32 ///
33 /// # Important: do not block
34 ///
35 /// The server will not process new messages until this handler returns.
36 /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
37 /// The recommended approach to manage expensive operations is to the [`JrConnectionCx::spawn`] method available on the message context.
38 ///
39 /// # Parameters
40 ///
41 /// * `cx` - The context of the request. This gives access to the request ID and the method name and is used to send a reply; can also be used to send other messages to the other party.
42 /// * `params` - The parameters of the request.
43 ///
44 /// # Returns
45 ///
46 /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
47 /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
48 /// * `Err` if an internal error occurs (this will bring down the server).
49 async fn handle_message(
50 &mut self,
51 message: MessageAndCx,
52 ) -> Result<Handled<MessageAndCx>, crate::Error>;
53
54 /// Returns a debug description of the handler chain for diagnostics
55 fn describe_chain(&self) -> impl std::fmt::Debug;
56}
57
58/// A JSON-RPC connection that can act as either a server, client, or both.
59///
60/// `JrConnection` provides a builder-style API for creating JSON-RPC servers and clients.
61/// You start by calling [`JrHandlerChain::new`], then add message handlers, and finally
62/// drive the connection with either [`serve`](JrHandlerChain::serve) or [`with_client`](JrHandlerChain::with_client),
63/// providing a component implementation (e.g., [`ByteStreams`] for byte streams).
64///
65/// # JSON-RPC Primer
66///
67/// JSON-RPC 2.0 has two fundamental message types:
68///
69/// * **Requests** - Messages that expect a response. They have an `id` field that gets
70/// echoed back in the response so the sender can correlate them.
71/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
72/// expect or receive a response.
73///
74/// # Type-Driven Message Dispatch
75///
76/// The handler registration methods use Rust's type system to determine which messages
77/// to handle. The type parameter you provide controls what gets dispatched to your handler:
78///
79/// ## Single Message Types
80///
81/// The simplest case - handle one specific message type:
82///
83/// ```no_run
84/// # use sacp_test::*;
85/// # use sacp::schema::{InitializeRequest, InitializeResponse, SessionNotification};
86/// # async fn example() -> Result<(), sacp::Error> {
87/// # let connection = mock_connection();
88/// connection
89/// .on_receive_request(async |req: InitializeRequest, cx| {
90/// // Handle only InitializeRequest messages
91/// cx.respond(InitializeResponse::make())
92/// })
93/// .on_receive_notification(async |notif: SessionNotification, cx| {
94/// // Handle only SessionUpdate notifications
95/// Ok(())
96/// })
97/// # .serve(sacp_test::MockTransport).await?;
98/// # Ok(())
99/// # }
100/// ```
101///
102/// ## Enum Message Types
103///
104/// You can also handle multiple related messages with a single handler by defining an enum
105/// that implements the appropriate trait ([`JrRequest`] or [`JrNotification`]):
106///
107/// ```no_run
108/// # use sacp_test::*;
109/// # use sacp::{JrRequest, JrMessage, UntypedMessage};
110/// # use sacp::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
111/// # async fn example() -> Result<(), sacp::Error> {
112/// # let connection = mock_connection();
113/// // Define an enum for multiple request types
114/// #[derive(Debug, Clone)]
115/// enum MyRequests {
116/// Initialize(InitializeRequest),
117/// Prompt(PromptRequest),
118/// }
119///
120/// // Implement JrRequest for your enum
121/// # impl JrMessage for MyRequests {
122/// # fn method(&self) -> &str { "myRequests" }
123/// # fn to_untyped_message(&self) -> Result<UntypedMessage, sacp::Error> { todo!() }
124/// # fn parse_request(_method: &str, _params: &impl serde::Serialize) -> Option<Result<Self, sacp::Error>> { None }
125/// # fn parse_notification(_method: &str, _params: &impl serde::Serialize) -> Option<Result<Self, sacp::Error>> { None }
126/// # }
127/// impl JrRequest for MyRequests { type Response = serde_json::Value; }
128///
129/// // Handle all variants in one place
130/// connection.on_receive_request(async |req: MyRequests, cx| {
131/// match req {
132/// MyRequests::Initialize(init) => { cx.respond(serde_json::json!({})) }
133/// MyRequests::Prompt(prompt) => { cx.respond(serde_json::json!({})) }
134/// }
135/// })
136/// # .serve(sacp_test::MockTransport).await?;
137/// # Ok(())
138/// # }
139/// ```
140///
141/// ## Mixed Message Types
142///
143/// For enums containing both requests AND notifications, use [`on_receive_message`](Self::on_receive_message):
144///
145/// ```no_run
146/// # use sacp_test::*;
147/// # use sacp::MessageAndCx;
148/// # use sacp::schema::{InitializeRequest, InitializeResponse, SessionNotification};
149/// # async fn example() -> Result<(), sacp::Error> {
150/// # let connection = mock_connection();
151/// // on_receive_message receives MessageAndCx which can be either a request or notification
152/// connection.on_receive_message(async |msg: MessageAndCx<InitializeRequest, SessionNotification>| {
153/// match msg {
154/// MessageAndCx::Request(req, request_cx) => {
155/// request_cx.respond(InitializeResponse::make())
156/// }
157/// MessageAndCx::Notification(notif, _cx) => {
158/// Ok(())
159/// }
160/// }
161/// })
162/// # .serve(sacp_test::MockTransport).await?;
163/// # Ok(())
164/// # }
165/// ```
166///
167/// # Handler Registration
168///
169/// Register handlers using these methods (listed from most common to most flexible):
170///
171/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
172/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
173/// * [`on_receive_message`](Self::on_receive_message) - Handle enums containing both requests and notifications
174/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
175///
176/// ## Handler Ordering
177///
178/// Handlers are tried in the order you register them. The first handler that claims a message
179/// (by matching its type) will process it. Subsequent handlers won't see that message:
180///
181/// ```no_run
182/// # use sacp_test::*;
183/// # use sacp::{MessageAndCx, UntypedMessage};
184/// # use sacp::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
185/// # async fn example() -> Result<(), sacp::Error> {
186/// # let connection = mock_connection();
187/// connection
188/// .on_receive_request(async |req: InitializeRequest, cx| {
189/// // This runs first for InitializeRequest
190/// cx.respond(InitializeResponse::make())
191/// })
192/// .on_receive_request(async |req: PromptRequest, cx| {
193/// // This runs first for PromptRequest
194/// cx.respond(PromptResponse::make())
195/// })
196/// .on_receive_message(async |msg: MessageAndCx| {
197/// // This runs for any message not handled above
198/// msg.respond_with_error(sacp::util::internal_error("unknown method"))
199/// })
200/// # .serve(sacp_test::MockTransport).await?;
201/// # Ok(())
202/// # }
203/// ```
204///
205/// # Event Loop and Concurrency
206///
207/// Understanding the event loop is critical for writing correct handlers.
208///
209/// ## The Event Loop
210///
211/// `JrConnection` runs all handler callbacks on a single async task - the event loop.
212/// While a handler is running, **the server cannot receive new messages**. This means
213/// any blocking or expensive work in your handlers will stall the entire connection.
214///
215/// To avoid blocking the event loop, use [`JrConnectionCx::spawn`] to offload serious
216/// work to concurrent tasks:
217///
218/// ```no_run
219/// # use sacp_test::*;
220/// # async fn example() -> Result<(), sacp::Error> {
221/// # let connection = mock_connection();
222/// connection.on_receive_request(async |req: AnalyzeRequest, cx| {
223/// // Clone cx for the spawned task
224/// cx.connection_cx().spawn({
225/// let connection_cx = cx.connection_cx();
226/// async move {
227/// let result = expensive_analysis(&req.data).await?;
228/// connection_cx.send_notification(AnalysisComplete { result })?;
229/// Ok(())
230/// }
231/// })?;
232///
233/// // Respond immediately without blocking
234/// cx.respond(AnalysisStarted { job_id: 42 })
235/// })
236/// # .serve(sacp_test::MockTransport).await?;
237/// # Ok(())
238/// # }
239/// ```
240///
241/// Note that the entire connection runs within one async task, so parallelism must be
242/// managed explicitly using [`spawn`](JrConnectionCx::spawn).
243///
244/// ## The Connection Context
245///
246/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
247///
248/// * **For request handlers** - [`JrRequestCx<R>`] provides [`respond`](JrRequestCx::respond)
249/// to send the response, plus methods to send other messages
250/// * **For notification handlers** - [`JrConnectionCx`] provides methods to send messages
251/// and spawn tasks
252///
253/// Both context types support:
254/// * [`send_request`](JrConnectionCx::send_request) - Send requests to the other side
255/// * [`send_notification`](JrConnectionCx::send_notification) - Send notifications
256/// * [`spawn`](JrConnectionCx::spawn) - Run tasks concurrently without blocking the event loop
257///
258/// The [`JrResponse`] returned by `send_request` provides methods like
259/// [`await_when_result_received`](JrResponse::await_when_result_received) that help you
260/// avoid accidentally blocking the event loop while waiting for responses.
261///
262/// # Driving the Connection
263///
264/// After adding handlers, you must drive the connection using one of two modes:
265///
266/// ## Server Mode: `serve()`
267///
268/// Use [`serve`](Self::serve) when you only need to respond to incoming messages:
269///
270/// ```no_run
271/// # use sacp_test::*;
272/// # async fn example() -> Result<(), sacp::Error> {
273/// # let connection = mock_connection();
274/// connection
275/// .on_receive_request(async |req: MyRequest, cx| {
276/// cx.respond(MyResponse { status: "ok".into() })
277/// })
278/// .serve(MockTransport) // Runs until connection closes or error occurs
279/// .await?;
280/// # Ok(())
281/// # }
282/// ```
283///
284/// The connection will process incoming messages and invoke your handlers until the
285/// connection is closed or an error occurs.
286///
287/// ## Client Mode: `with_client()`
288///
289/// Use [`with_client`](Self::with_client) when you need to both handle incoming messages
290/// AND send your own requests/notifications:
291///
292/// ```no_run
293/// # use sacp_test::*;
294/// # use sacp::schema::InitializeRequest;
295/// # async fn example() -> Result<(), sacp::Error> {
296/// # let connection = mock_connection();
297/// connection
298/// .on_receive_request(async |req: MyRequest, cx| {
299/// cx.respond(MyResponse { status: "ok".into() })
300/// })
301/// .with_client(MockTransport, async |cx| {
302/// // You can send requests to the other side
303/// let response = cx.send_request(InitializeRequest::make())
304/// .block_task()
305/// .await?;
306///
307/// // And send notifications
308/// cx.send_notification(StatusUpdate { message: "ready".into() })?;
309///
310/// Ok(())
311/// })
312/// .await?;
313/// # Ok(())
314/// # }
315/// ```
316///
317/// The connection will serve incoming messages in the background while your client closure
318/// runs. When the closure returns, the connection shuts down.
319///
320/// # Example: Complete Agent
321///
322/// ```no_run
323/// # use sacp::JrHandlerChain;
324/// # use sacp::ByteStreams;
325/// # use sacp::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
326/// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
327/// # async fn example() -> Result<(), sacp::Error> {
328/// let transport = ByteStreams::new(
329/// tokio::io::stdout().compat_write(),
330/// tokio::io::stdin().compat(),
331/// );
332///
333/// JrHandlerChain::new()
334/// .name("my-agent") // Optional: for debugging logs
335/// .on_receive_request(async |init: InitializeRequest, cx| {
336/// let response: InitializeResponse = todo!();
337/// cx.respond(response)
338/// })
339/// .on_receive_request(async |prompt: PromptRequest, cx| {
340/// // You can send notifications while processing a request
341/// let notif: SessionNotification = todo!();
342/// cx.connection_cx().send_notification(notif)?;
343///
344/// // Then respond to the request
345/// let response: PromptResponse = todo!();
346/// cx.respond(response)
347/// })
348/// .serve(transport)
349/// .await?;
350/// # Ok(())
351/// # }
352/// ```
353#[must_use]
354pub struct JrHandlerChain<H: JrMessageHandler> {
355 name: Option<String>,
356
357 /// Handler for incoming messages.
358 handler: H,
359
360 /// Pending tasks
361 pending_tasks: Vec<PendingTask>,
362}
363
364impl JrHandlerChain<NullHandler> {
365 /// Create a new JrConnection.
366 /// This type follows a builder pattern; use other methods to configure and then invoke
367 /// [`Self::serve`] (to use as a server) or [`Self::with_client`] to use as a client.
368 pub fn new() -> Self {
369 Self::new_with(NullHandler::default())
370 }
371}
372
373impl<H: JrMessageHandler> JrHandlerChain<H> {
374 /// Create a new handler chain with the given handler.
375 pub fn new_with(handler: H) -> Self {
376 Self {
377 name: Default::default(),
378 handler,
379 pending_tasks: Default::default(),
380 }
381 }
382
383 /// Set the "name" of this connection -- used only for debugging logs.
384 pub fn name(mut self, name: impl ToString) -> Self {
385 self.name = Some(name.to_string());
386 self
387 }
388
389 /// Add a new [`JrMessageHandler`] to the chain.
390 ///
391 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
392 /// This is a low-level method that is not intended for general use.
393 pub fn with_handler_chain<H1>(
394 mut self,
395 handler_chain: JrHandlerChain<H1>,
396 ) -> JrHandlerChain<ChainedHandler<H, NamedHandler<H1>>>
397 where
398 H1: JrMessageHandler,
399 {
400 self.pending_tasks.extend(
401 handler_chain
402 .pending_tasks
403 .into_iter()
404 .map(|t| t.named(handler_chain.name.clone())),
405 );
406
407 JrHandlerChain {
408 name: self.name,
409 handler: ChainedHandler::new(
410 self.handler,
411 NamedHandler::new(handler_chain.name, handler_chain.handler),
412 ),
413 pending_tasks: self.pending_tasks,
414 }
415 }
416
417 /// Add a new [`JrMessageHandler`] to the chain.
418 ///
419 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
420 /// This is a low-level method that is not intended for general use.
421 pub fn with_handler<H1>(self, handler: H1) -> JrHandlerChain<ChainedHandler<H, H1>>
422 where
423 H1: JrMessageHandler,
424 {
425 JrHandlerChain {
426 name: self.name,
427 handler: ChainedHandler::new(self.handler, handler),
428 pending_tasks: self.pending_tasks,
429 }
430 }
431
432 /// Enqueue a task to run once the connection is actively serving traffic.
433 #[track_caller]
434 pub fn with_spawned<F>(
435 mut self,
436 task: impl FnOnce(JrConnectionCx) -> F + Send + 'static,
437 ) -> Self
438 where
439 F: Future<Output = Result<(), crate::Error>> + Send + 'static,
440 {
441 let location = Location::caller();
442 self.pending_tasks.push(PendingTask::new(location, task));
443 self
444 }
445
446 /// Register a handler for messages that can be either requests OR notifications.
447 ///
448 /// Use this when you want to handle an enum type that contains both request and
449 /// notification variants. Your handler receives a [`MessageAndCx<R, N>`] which
450 /// is an enum with two variants:
451 ///
452 /// - `MessageAndCx::Request(request, request_cx)` - A request with its response context
453 /// - `MessageAndCx::Notification(notification, cx)` - A notification with the connection context
454 ///
455 /// # Example
456 ///
457 /// ```no_run
458 /// # use sacp_test::*;
459 /// # use sacp::MessageAndCx;
460 /// # async fn example() -> Result<(), sacp::Error> {
461 /// # let connection = mock_connection();
462 /// connection.on_receive_message(async |message: MessageAndCx<MyRequest, StatusUpdate>| {
463 /// match message {
464 /// MessageAndCx::Request(req, request_cx) => {
465 /// // Handle request and send response
466 /// request_cx.respond(MyResponse { status: "ok".into() })
467 /// }
468 /// MessageAndCx::Notification(notif, _cx) => {
469 /// // Handle notification (no response needed)
470 /// Ok(())
471 /// }
472 /// }
473 /// })
474 /// # .serve(sacp_test::MockTransport).await?;
475 /// # Ok(())
476 /// # }
477 /// ```
478 ///
479 /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
480 /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
481 /// for handling requests or notifications separately.
482 pub fn on_receive_message<R, N, F, T>(
483 self,
484 op: F,
485 ) -> JrHandlerChain<ChainedHandler<H, MessageHandler<R, N, F>>>
486 where
487 R: JrRequest,
488 N: JrNotification,
489 F: AsyncFnMut(MessageAndCx<R, N>) -> Result<T, crate::Error> + Send,
490 T: IntoHandled<MessageAndCx<R, N>>,
491 {
492 self.with_handler(MessageHandler::new(op))
493 }
494
495 /// Register a handler for JSON-RPC requests of type `R`.
496 ///
497 /// Your handler receives two arguments:
498 /// 1. The request (type `R`)
499 /// 2. A [`JrRequestCx<R::Response>`] for sending the response
500 ///
501 /// The request context allows you to:
502 /// - Send the response with [`JrRequestCx::respond`]
503 /// - Send notifications to the client with [`JrConnectionCx::send_notification`]
504 /// - Send requests to the client with [`JrConnectionCx::send_request`]
505 ///
506 /// # Example
507 ///
508 /// ```
509 /// # use sacp::JrHandlerChain;
510 /// # use sacp::schema::{PromptRequest, PromptResponse, SessionNotification};
511 /// # fn example(connection: JrHandlerChain<impl sacp::JrMessageHandler>) {
512 /// connection.on_receive_request(async |request: PromptRequest, request_cx| {
513 /// // Send a notification while processing
514 /// let notif: SessionNotification = todo!();
515 /// request_cx.connection_cx().send_notification(notif)?;
516 ///
517 /// // Do some work...
518 /// let result = todo!("process the prompt");
519 ///
520 /// // Send the response
521 /// let response: PromptResponse = todo!();
522 /// request_cx.respond(response)
523 /// });
524 /// # }
525 /// ```
526 ///
527 /// # Type Parameter
528 ///
529 /// `R` can be either a single request type or an enum of multiple request types.
530 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
531 pub fn on_receive_request<R, F, T>(
532 self,
533 op: F,
534 ) -> JrHandlerChain<ChainedHandler<H, RequestHandler<R, F>>>
535 where
536 R: JrRequest,
537 F: AsyncFnMut(R, JrRequestCx<R::Response>) -> Result<T, crate::Error> + Send,
538 T: IntoHandled<(R, JrRequestCx<R::Response>)>,
539 {
540 self.with_handler(RequestHandler::new(op))
541 }
542
543 /// Register a handler for JSON-RPC notifications of type `N`.
544 ///
545 /// Notifications are fire-and-forget messages that don't expect a response.
546 /// Your handler receives:
547 /// 1. The notification (type `N`)
548 /// 2. A [`JrConnectionCx`] for sending messages to the other side
549 ///
550 /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
551 /// but you can still send your own requests and notifications using the context.
552 ///
553 /// # Example
554 ///
555 /// ```no_run
556 /// # use sacp_test::*;
557 /// # async fn example() -> Result<(), sacp::Error> {
558 /// # let connection = mock_connection();
559 /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
560 /// // Process the notification
561 /// update_session_state(¬if)?;
562 ///
563 /// // Optionally send a notification back
564 /// cx.send_notification(StatusUpdate {
565 /// message: "Acknowledged".into(),
566 /// })?;
567 ///
568 /// Ok(())
569 /// })
570 /// # .serve(sacp_test::MockTransport).await?;
571 /// # Ok(())
572 /// # }
573 /// ```
574 ///
575 /// # Type Parameter
576 ///
577 /// `N` can be either a single notification type or an enum of multiple notification types.
578 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
579 pub fn on_receive_notification<N, F, T>(
580 self,
581 op: F,
582 ) -> JrHandlerChain<ChainedHandler<H, NotificationHandler<N, F>>>
583 where
584 N: JrNotification,
585 F: AsyncFnMut(N, JrConnectionCx) -> Result<T, crate::Error> + Send,
586 T: IntoHandled<(N, JrConnectionCx)>,
587 {
588 self.with_handler(NotificationHandler::new(op))
589 }
590
591 /// Connect these handlers to a transport layer.
592 /// The resulting connection must then be either [served](`JrConnection::serve`) or [used as a client](`JrConnection::with_client`).
593 pub fn connect_to(
594 self,
595 transport: impl Component + 'static,
596 ) -> Result<JrConnection<H>, crate::Error> {
597 let Self {
598 name,
599 handler,
600 pending_tasks,
601 } = self;
602
603 let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
604 let (new_task_tx, new_task_rx) = mpsc::unbounded();
605 let cx = JrConnectionCx::new(outgoing_tx, new_task_tx);
606
607 // Convert transport into server - this returns a channel for us to use
608 // and a future that runs the transport
609 let transport_component = crate::DynComponent::new(transport);
610 let (transport_channel, transport_future) = transport_component.into_server();
611 cx.spawn(transport_future)?;
612
613 // Destructure the channel endpoints
614 let Channel {
615 rx: transport_incoming_rx,
616 tx: transport_outgoing_tx,
617 } = transport_channel;
618
619 // Spawn pending tasks
620 for pending_task in pending_tasks {
621 let task = pending_task.into_task(cx.clone());
622 cx.spawn_task(task)?;
623 }
624
625 Ok(JrConnection {
626 cx,
627 name,
628 outgoing_rx,
629 new_task_rx,
630 transport_outgoing_tx,
631 transport_incoming_rx,
632 handler,
633 })
634 }
635
636 /// Apply the handler chain to a single message.
637 ///
638 /// This method processes one message through the entire handler chain, attempting to
639 /// match it against each registered handler in order. This is useful when implementing
640 /// custom message handling logic or when you need fine-grained control over message
641 /// processing.
642 ///
643 /// # Returns
644 ///
645 /// - `Ok(Handled::Claimed)` - A handler claimed and processed the message
646 /// - `Ok(Handled::Unclaimed(message))` - No handler matched the message
647 /// - `Err(_)` - A handler encountered an error while processing
648 ///
649 /// # Borrow Checker Considerations
650 ///
651 /// You may find that [`MatchMessage`] is a better choice than this method
652 /// for implementing custom handlers. It offers a very similar API to
653 /// [`JrHandlerChain`] but is structured to apply each test one at a time
654 /// (sequentially) instead of setting them all up at once. This sequential approach
655 /// often interacts better with the borrow checker, at the cost of requiring `.await`
656 /// calls between each handler and only working for processing a single message.
657 ///
658 /// # Example: Borrow Checker Challenges
659 ///
660 /// When building a handler chain with `async {}` blocks (non-move), you might encounter
661 /// borrow checker errors if multiple handlers need access to the same mutable state:
662 ///
663 /// ```compile_fail
664 /// # use sacp::{JrHandlerChain, JrRequestCx};
665 /// # use sacp::schema::{InitializeRequest, InitializeResponse};
666 /// # use sacp::schema::{PromptRequest, PromptResponse};
667 /// # async fn example() -> Result<(), sacp::Error> {
668 /// let mut state = String::from("initial");
669 ///
670 /// // This fails to compile because both handlers borrow `state` mutably,
671 /// // and the futures are set up at the same time (even though only one will run)
672 /// let chain = JrHandlerChain::new()
673 /// .on_receive_request(async |req: InitializeRequest, cx: JrRequestCx| {
674 /// state.push_str(" - initialized"); // First mutable borrow
675 /// cx.respond(InitializeResponse::make())
676 /// })
677 /// .on_receive_request(async |req: PromptRequest, cx: JrRequestCx| {
678 /// state.push_str(" - prompted"); // Second mutable borrow - ERROR!
679 /// cx.respond(PromptResponse { content: vec![], stopReason: None })
680 /// });
681 /// # Ok(())
682 /// # }
683 /// ```
684 ///
685 /// You can work around this by using `apply()` to process messages one at a time,
686 /// or use [`MatchMessage`] which provides a similar API but applies handlers sequentially:
687 ///
688 /// ```ignore
689 /// use sacp::{MessageAndCx, Handled};
690 /// use sacp::util::MatchMessage;
691 ///
692 /// async fn handle_with_state(
693 /// message: MessageAndCx,
694 /// state: &mut String,
695 /// ) -> Result<Handled<MessageAndCx>, sacp::Error> {
696 /// MatchMessage::new(message)
697 /// .on_request(async |req: InitializeRequest, cx| {
698 /// state.push_str(" - initialized"); // Sequential - OK!
699 /// cx.respond(InitializeResponse::make())
700 /// })
701 /// .on_request(async |req: PromptRequest, cx| {
702 /// state.push_str(" - prompted"); // Sequential - OK!
703 /// cx.respond(PromptResponse { content: vec![], stopReason: None })
704 /// })
705 /// .otherwise(async |msg| Ok(Handled::Unclaimed(msg)))
706 /// .await
707 /// }
708 /// ```
709 pub async fn apply(
710 &mut self,
711 message: MessageAndCx,
712 ) -> Result<Handled<MessageAndCx>, crate::Error> {
713 self.handler.handle_message(message).await
714 }
715
716 /// Convenience method to connect to a transport and serve.
717 ///
718 /// This is equivalent to:
719 /// ```ignore
720 /// handler_chain.connect_to(transport)?.serve().await
721 /// ```
722 pub async fn serve(self, transport: impl Component + 'static) -> Result<(), crate::Error> {
723 self.connect_to(transport)?.serve().await
724 }
725
726 /// Convenience method to connect to a transport and run a client function.
727 ///
728 /// This is equivalent to:
729 /// ```ignore
730 /// handler_chain.connect_to(transport)?.with_client(main_fn).await
731 /// ```
732 pub async fn with_client(
733 self,
734 transport: impl Component + 'static,
735 main_fn: impl AsyncFnOnce(JrConnectionCx) -> Result<(), crate::Error>,
736 ) -> Result<(), crate::Error> {
737 self.connect_to(transport)?.with_client(main_fn).await
738 }
739}
740
741/// A JSON-RPC connection with an active transport.
742///
743/// This type represents a `JrHandlerChain` that has been connected to a transport
744/// via `connect_to()`. It can be driven in two modes:
745///
746/// - [`serve()`](Self::serve) - Run as a server, handling incoming messages until the connection closes
747/// - [`with_client()`](Self::with_client) - Run with client logic, allowing you to send requests/notifications
748///
749/// Most users won't construct this directly - instead use `JrHandlerChain::connect_to()` or
750/// `JrHandlerChain::serve()` for convenience.
751pub struct JrConnection<H: JrMessageHandler> {
752 cx: JrConnectionCx,
753 name: Option<String>,
754 outgoing_rx: mpsc::UnboundedReceiver<OutgoingMessage>,
755 new_task_rx: mpsc::UnboundedReceiver<Task>,
756 transport_outgoing_tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
757 transport_incoming_rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
758 handler: H,
759}
760
761impl<H: JrMessageHandler> JrConnection<H> {
762 /// Run the connection in server mode with the provided transport.
763 ///
764 /// This drives the connection by continuously processing messages from the transport
765 /// and dispatching them to your registered handlers. The connection will run until:
766 /// - The transport closes (e.g., EOF on byte streams)
767 /// - An error occurs
768 /// - One of your handlers returns an error
769 ///
770 /// The transport is responsible for serializing and deserializing `jsonrpcmsg::Message`
771 /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
772 ///
773 /// Use this mode when you only need to respond to incoming messages and don't need
774 /// to initiate your own requests. If you need to send requests to the other side,
775 /// use [`with_client`](Self::with_client) instead.
776 ///
777 /// # Example: Byte Stream Transport
778 ///
779 /// ```no_run
780 /// # use sacp::JrHandlerChain;
781 /// # use sacp::ByteStreams;
782 /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
783 /// # use sacp_test::*;
784 /// # async fn example() -> Result<(), sacp::Error> {
785 /// let transport = ByteStreams::new(
786 /// tokio::io::stdout().compat_write(),
787 /// tokio::io::stdin().compat(),
788 /// );
789 ///
790 /// JrHandlerChain::new()
791 /// .on_receive_request(async |req: MyRequest, cx| {
792 /// cx.respond(MyResponse { status: "ok".into() })
793 /// })
794 /// .serve(transport)
795 /// .await?;
796 /// # Ok(())
797 /// # }
798 /// ```
799 pub async fn serve(self) -> Result<(), crate::Error> {
800 self.with_client(async move |_cx| future::pending().await)
801 .await
802 }
803
804 /// Run the connection in client mode, both handling incoming messages and sending your own.
805 ///
806 /// This drives the connection by:
807 /// 1. Running your registered handlers in the background to process incoming messages
808 /// 2. Executing your `main_fn` closure with a [`JrConnectionCx`] for sending requests/notifications
809 ///
810 /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
811 /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
812 ///
813 /// Use this mode when you need to initiate communication (send requests/notifications)
814 /// in addition to responding to incoming messages. For server-only mode where you just
815 /// respond to messages, use [`serve`](Self::serve) instead.
816 ///
817 /// # Example
818 ///
819 /// ```no_run
820 /// # use sacp::JrHandlerChain;
821 /// # use sacp::ByteStreams;
822 /// # use sacp::schema::InitializeRequest;
823 /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
824 /// # use sacp_test::*;
825 /// # async fn example() -> Result<(), sacp::Error> {
826 /// let transport = ByteStreams::new(
827 /// tokio::io::stdout().compat_write(),
828 /// tokio::io::stdin().compat(),
829 /// );
830 ///
831 /// JrHandlerChain::new()
832 /// .on_receive_request(async |req: MyRequest, cx| {
833 /// // Handle incoming requests in the background
834 /// cx.respond(MyResponse { status: "ok".into() })
835 /// })
836 /// .connect_to(transport)?
837 /// .with_client(async |cx| {
838 /// // Initialize the protocol
839 /// let init_response = cx.send_request(InitializeRequest::make())
840 /// .block_task()
841 /// .await?;
842 ///
843 /// // Send more requests...
844 /// let result = cx.send_request(MyRequest {})
845 /// .block_task()
846 /// .await?;
847 ///
848 /// // When this closure returns, the connection shuts down
849 /// Ok(())
850 /// })
851 /// .await?;
852 /// # Ok(())
853 /// # }
854 /// ```
855 ///
856 /// # Parameters
857 ///
858 /// - `transport`: The transport implementation for sending/receiving messages
859 /// - `main_fn`: Your client logic. Receives a [`JrConnectionCx`] for sending messages.
860 ///
861 /// # Errors
862 ///
863 /// Returns an error if the connection closes before `main_fn` completes.
864 pub async fn with_client(
865 self,
866 main_fn: impl AsyncFnOnce(JrConnectionCx) -> Result<(), crate::Error>,
867 ) -> Result<(), crate::Error> {
868 let JrConnection {
869 cx,
870 name,
871 outgoing_rx,
872 new_task_rx,
873 handler,
874 transport_outgoing_tx,
875 transport_incoming_rx,
876 } = self;
877 let (reply_tx, reply_rx) = mpsc::unbounded();
878
879 crate::util::instrument_with_connection_name(name, async move {
880 futures::select!(
881 // Protocol layer: OutgoingMessage → jsonrpcmsg::Message
882 r = actors::outgoing_protocol_actor(
883 outgoing_rx,
884 reply_tx.clone(),
885 transport_outgoing_tx,
886 ).fuse() => {
887 tracing::trace!(?r, "outgoing protocol actor terminated");
888 r?;
889 }
890 // Protocol layer: jsonrpcmsg::Message → handler/reply routing
891 r = actors::incoming_protocol_actor(
892 &cx,
893 transport_incoming_rx,
894 reply_tx,
895 handler,
896 ).fuse() => {
897 tracing::trace!(?r, "incoming protocol actor terminated");
898 r?;
899 }
900 r = actors::reply_actor(reply_rx).fuse() => {
901 tracing::trace!(?r, "reply actor terminated");
902 r?;
903 }
904 r = task_actor::task_actor(new_task_rx).fuse() => {
905 tracing::trace!(?r, "task actor terminated");
906 r?;
907 }
908 r = main_fn(cx.clone()).fuse() => {
909 tracing::trace!(?r, "main actor terminated");
910 r?;
911 }
912 );
913 Ok(())
914 })
915 .await
916 }
917}
918
919/// Message sent to the reply management actor
920enum ReplyMessage {
921 /// Wait for a response to the given id and then send it to the given receiver
922 Subscribe(
923 jsonrpcmsg::Id,
924 oneshot::Sender<Result<serde_json::Value, crate::Error>>,
925 ),
926
927 /// Dispatch a response to the given id and value
928 Dispatch(jsonrpcmsg::Id, Result<serde_json::Value, crate::Error>),
929}
930
931/// Messages send to be serialized over the transport.
932#[derive(Debug)]
933enum OutgoingMessage {
934 /// Send a request to the server.
935 Request {
936 /// method to use in the request
937 method: String,
938
939 /// parameters for the request
940 params: Option<jsonrpcmsg::Params>,
941
942 /// where to send the response when it arrives
943 response_tx: oneshot::Sender<Result<serde_json::Value, crate::Error>>,
944 },
945
946 /// Send a notification to the server.
947 Notification {
948 /// method to use in the request
949 method: String,
950
951 /// parameters for the request
952 params: Option<jsonrpcmsg::Params>,
953 },
954
955 /// Send a reponse to a message from the server
956 Response {
957 id: jsonrpcmsg::Id,
958
959 response: Result<serde_json::Value, crate::Error>,
960 },
961
962 /// Send a generalized error message
963 Error { error: crate::Error },
964}
965
966/// Return type from JrHandler; indicates whether the request was handled or not.
967#[must_use]
968pub enum Handled<T> {
969 /// The message was handled
970 Yes,
971 /// The message was not handled; returns the original value
972 No(T),
973}
974
975/// Trait for converting handler return values into [`Handled`].
976///
977/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
978/// or an explicit `Handled<T>` value for more control over handler chain propagation.
979pub trait IntoHandled<T> {
980 /// Convert this value into a `Handled<T>`.
981 fn into_handled(self) -> Handled<T>;
982}
983
984impl<T> IntoHandled<T> for () {
985 fn into_handled(self) -> Handled<T> {
986 Handled::Yes
987 }
988}
989
990impl<T> IntoHandled<T> for Handled<T> {
991 fn into_handled(self) -> Handled<T> {
992 self
993 }
994}
995
996/// Trait for types that can provide transport for JSON-RPC messages.
997///
998/// Implementations of this trait bridge between the internal protocol channels
999/// (which carry `jsonrpcmsg::Message`) and the actual I/O mechanism (byte streams,
1000/// in-process channels, network sockets, etc.).
1001///
1002/// The transport layer is responsible only for moving `jsonrpcmsg::Message` in and out.
1003/// It has no knowledge of protocol semantics like request/response correlation, ID assignment,
1004/// or handler dispatch - those are handled by the protocol layer in `JrConnection`.
1005///
1006/// # Example
1007///
1008/// See [`ByteStreams`] for the standard byte stream implementation.
1009
1010/// Connection context for sending messages and spawning tasks.
1011///
1012/// This is the primary handle for interacting with the JSON-RPC connection from
1013/// within handler callbacks. You can use it to:
1014///
1015/// * Send requests and notifications to the other side
1016/// * Spawn concurrent tasks that run alongside the connection
1017/// * Respond to requests (via [`JrRequestCx`] which wraps this)
1018///
1019/// # Cloning
1020///
1021/// `JrConnectionCx` is cheaply cloneable - all clones refer to the same underlying connection.
1022/// This makes it easy to share across async tasks.
1023///
1024/// # Event Loop and Concurrency
1025///
1026/// Handler callbacks run on the event loop, which means the connection cannot process new
1027/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
1028/// expensive or blocking work to concurrent tasks.
1029///
1030/// See the [Event Loop and Concurrency](JrConnection#event-loop-and-concurrency) section
1031/// for more details.
1032#[derive(Clone, Debug)]
1033pub struct JrConnectionCx {
1034 message_tx: mpsc::UnboundedSender<OutgoingMessage>,
1035 task_tx: mpsc::UnboundedSender<Task>,
1036}
1037
1038impl JrConnectionCx {
1039 fn new(
1040 tx: mpsc::UnboundedSender<OutgoingMessage>,
1041 task_tx: mpsc::UnboundedSender<Task>,
1042 ) -> Self {
1043 Self {
1044 message_tx: tx,
1045 task_tx,
1046 }
1047 }
1048
1049 /// Spawns a task that will run so long as the JSON-RPC connection is being served.
1050 ///
1051 /// This is the primary mechanism for offloading expensive work from handler callbacks
1052 /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
1053 /// allowing the server to continue processing messages.
1054 ///
1055 /// # Event Loop
1056 ///
1057 /// Handler callbacks run on the event loop, which cannot process new messages while
1058 /// your handler is running. Use `spawn` for any expensive operations:
1059 ///
1060 /// ```no_run
1061 /// # use sacp_test::*;
1062 /// # async fn example() -> Result<(), sacp::Error> {
1063 /// # let connection = mock_connection();
1064 /// connection.on_receive_request(async |req: ProcessRequest, cx| {
1065 /// // Clone cx for the spawned task
1066 /// cx.connection_cx().spawn({
1067 /// let connection_cx = cx.connection_cx();
1068 /// async move {
1069 /// let result = expensive_operation(&req.data).await?;
1070 /// connection_cx.send_notification(ProcessComplete { result })?;
1071 /// Ok(())
1072 /// }
1073 /// })?;
1074 ///
1075 /// // Respond immediately
1076 /// cx.respond(ProcessResponse { result: "started".into() })
1077 /// })
1078 /// # .serve(sacp_test::MockTransport).await?;
1079 /// # Ok(())
1080 /// # }
1081 /// ```
1082 ///
1083 /// # Errors
1084 ///
1085 /// If the spawned task returns an error, the entire server will shut down.
1086 #[track_caller]
1087 pub fn spawn(
1088 &self,
1089 task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
1090 ) -> Result<(), crate::Error> {
1091 let location = std::panic::Location::caller();
1092 let task = task.into_future();
1093 self.spawn_task(Task::new(location, task))
1094 }
1095
1096 fn spawn_task(&self, task: Task) -> Result<(), crate::Error> {
1097 self.task_tx
1098 .unbounded_send(task)
1099 .map_err(crate::util::internal_error)
1100 }
1101
1102 /// Spawn a JSON-RPC connection in the background and return a [`JrConnectionCx`] for sending messages to it.
1103 ///
1104 /// This is useful for creating multiple connections that communicate with each other,
1105 /// such as implementing proxy patterns or connecting to multiple backend services.
1106 ///
1107 /// # Arguments
1108 ///
1109 /// - `connection`: The `JrConnection` to spawn (typically created via `JrHandlerChain::connect_to()`)
1110 /// - `serve_future`: A function that drives the connection (usually `|c| Box::pin(c.serve())`)
1111 ///
1112 /// # Returns
1113 ///
1114 /// A `JrConnectionCx` that you can use to send requests and notifications to the spawned connection.
1115 ///
1116 /// # Example: Proxying to a backend connection
1117 ///
1118 /// ```
1119 /// # use sacp::{JrHandlerChain, JrConnectionCx};
1120 /// # use sacp_test::*;
1121 /// # async fn example(cx: JrConnectionCx) -> Result<(), sacp::Error> {
1122 /// // Set up a backend connection
1123 /// let backend = JrHandlerChain::new()
1124 /// .on_receive_request(async |req: MyRequest, request_cx| {
1125 /// request_cx.respond(MyResponse { status: "ok".into() })
1126 /// })
1127 /// .connect_to(MockTransport)?;
1128 ///
1129 /// // Spawn it and get a context to send requests to it
1130 /// let backend_cx = cx.spawn_connection(backend, |c| Box::pin(c.serve()))?;
1131 ///
1132 /// // Now you can forward requests to the backend
1133 /// let response = backend_cx.send_request(MyRequest {}).block_task().await?;
1134 /// # Ok(())
1135 /// # }
1136 /// ```
1137 #[track_caller]
1138 pub fn spawn_connection<H: JrMessageHandler>(
1139 &self,
1140 connection: JrConnection<H>,
1141 serve_future: impl FnOnce(JrConnection<H>) -> BoxFuture<'static, Result<(), crate::Error>>,
1142 ) -> Result<JrConnectionCx, crate::Error> {
1143 let cx = connection.cx.clone();
1144 let future = serve_future(connection);
1145 let task = Task::new(std::panic::Location::caller(), future);
1146 self.spawn_task(task)?;
1147 Ok(cx)
1148 }
1149
1150 /// Send a request/notification and forward the response appropriately.
1151 ///
1152 /// The request context's response type matches the request's response type,
1153 /// enabling type-safe message forwarding.
1154 pub fn send_proxied_message<R, N>(
1155 &self,
1156 message: MessageAndCx<R, N>,
1157 ) -> Result<(), crate::Error>
1158 where
1159 R: JrRequest<Response: Send>,
1160 N: JrNotification,
1161 {
1162 match message {
1163 MessageAndCx::Request(request, request_cx) => {
1164 self.send_request(request).forward_to_request_cx(request_cx)
1165 }
1166 MessageAndCx::Notification(notification, _) => self.send_notification(notification),
1167 }
1168 }
1169
1170 /// Send an outgoing request and return a [`JrResponse`] for handling the reply.
1171 ///
1172 /// The returned [`JrResponse`] provides methods for receiving the response without
1173 /// blocking the event loop:
1174 ///
1175 /// * [`await_when_result_received`](JrResponse::await_when_result_received) - Schedule
1176 /// a callback to run when the response arrives (doesn't block the event loop)
1177 /// * [`block_task`](JrResponse::block_task) - Block the current task until the response
1178 /// arrives (only safe in spawned tasks, not in handlers)
1179 ///
1180 /// # Anti-Footgun Design
1181 ///
1182 /// The API intentionally makes it difficult to block on the result directly to prevent
1183 /// the common mistake of blocking the event loop while waiting for a response:
1184 ///
1185 /// ```compile_fail
1186 /// # use sacp_test::*;
1187 /// # async fn example(cx: sacp::JrConnectionCx) -> Result<(), sacp::Error> {
1188 /// // ❌ This doesn't compile - prevents blocking the event loop
1189 /// let response = cx.send_request(MyRequest {}).await?;
1190 /// # Ok(())
1191 /// # }
1192 /// ```
1193 ///
1194 /// ```no_run
1195 /// # use sacp_test::*;
1196 /// # async fn example(cx: sacp::JrConnectionCx) -> Result<(), sacp::Error> {
1197 /// // ✅ Option 1: Schedule callback (safe in handlers)
1198 /// cx.send_request(MyRequest {})
1199 /// .await_when_result_received(async |result| {
1200 /// // Handle the response
1201 /// Ok(())
1202 /// })?;
1203 ///
1204 /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
1205 /// cx.spawn({
1206 /// let cx = cx.clone();
1207 /// async move {
1208 /// let response = cx.send_request(MyRequest {})
1209 /// .block_task()
1210 /// .await?;
1211 /// // Process response...
1212 /// Ok(())
1213 /// }
1214 /// })?;
1215 /// # Ok(())
1216 /// # }
1217 /// ```
1218 pub fn send_request<Req: JrRequest>(&self, request: Req) -> JrResponse<Req::Response> {
1219 let method = request.method().to_string();
1220 let (response_tx, response_rx) = oneshot::channel();
1221 match request.to_untyped_message() {
1222 Ok(untyped) => {
1223 let params = crate::util::json_cast(untyped.params).ok();
1224 let message = OutgoingMessage::Request {
1225 method: method.clone(),
1226 params,
1227 response_tx,
1228 };
1229
1230 match self.message_tx.unbounded_send(message) {
1231 Ok(()) => (),
1232 Err(error) => {
1233 let OutgoingMessage::Request {
1234 method,
1235 response_tx,
1236 ..
1237 } = error.into_inner()
1238 else {
1239 unreachable!();
1240 };
1241
1242 response_tx
1243 .send(Err(communication_failure(format!(
1244 "failed to send outgoing request `{method}"
1245 ))))
1246 .unwrap();
1247 }
1248 }
1249 }
1250
1251 Err(_) => {
1252 response_tx
1253 .send(Err(communication_failure(format!(
1254 "failed to create untyped request for `{method}"
1255 ))))
1256 .unwrap();
1257 }
1258 }
1259
1260 JrResponse::new(method.clone(), self.clone(), response_rx)
1261 .map(move |json| <Req::Response>::from_value(&method, json))
1262 }
1263
1264 /// Send an outgoing notification (no reply expected).
1265 ///
1266 /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
1267 /// This method sends the notification immediately and returns.
1268 ///
1269 /// ```no_run
1270 /// # use sacp_test::*;
1271 /// # async fn example(cx: sacp::JrConnectionCx) -> Result<(), sacp::Error> {
1272 /// cx.send_notification(StatusUpdate {
1273 /// message: "Processing...".into(),
1274 /// })?;
1275 /// # Ok(())
1276 /// # }
1277 /// ```
1278 pub fn send_notification<N: JrNotification>(
1279 &self,
1280 notification: N,
1281 ) -> Result<(), crate::Error> {
1282 let untyped = notification.to_untyped_message()?;
1283 let params = crate::util::json_cast(untyped.params).ok();
1284 self.send_raw_message(OutgoingMessage::Notification {
1285 method: untyped.method,
1286 params,
1287 })
1288 }
1289
1290 /// Send an error notification (no reply expected).
1291 pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
1292 self.send_raw_message(OutgoingMessage::Error { error })
1293 }
1294
1295 fn send_raw_message(&self, message: OutgoingMessage) -> Result<(), crate::Error> {
1296 match &message {
1297 OutgoingMessage::Response { id, response } => match response {
1298 Ok(_) => tracing::debug!(?id, "send_raw_message: queuing success response"),
1299 Err(e) => tracing::warn!(?id, ?e, "send_raw_message: queuing error response"),
1300 },
1301 _ => {}
1302 }
1303 self.message_tx
1304 .unbounded_send(message)
1305 .map_err(communication_failure)
1306 }
1307}
1308
1309/// The context to respond to an incoming request.
1310///
1311/// This context is provided to request handlers and serves a dual role:
1312///
1313/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
1314/// [`respond_with_result`](Self::respond_with_result) to send the response
1315/// 2. **Send other messages** - Use [`connection_cx`](Self::connection_cx) to access the
1316/// underlying [`JrConnectionCx`], giving access to
1317/// [`send_request`](JrConnectionCx::send_request),
1318/// [`send_notification`](JrConnectionCx::send_notification), and
1319/// [`spawn`](JrConnectionCx::spawn)
1320///
1321/// # Example
1322///
1323/// ```no_run
1324/// # use sacp_test::*;
1325/// # async fn example() -> Result<(), sacp::Error> {
1326/// # let connection = mock_connection();
1327/// connection.on_receive_request(async |req: ProcessRequest, cx| {
1328/// // Send a notification while processing
1329/// cx.connection_cx().send_notification(StatusUpdate {
1330/// message: "processing".into(),
1331/// })?;
1332///
1333/// // Do some work...
1334/// let result = process(&req.data)?;
1335///
1336/// // Respond to the request
1337/// cx.respond(ProcessResponse { result })
1338/// })
1339/// # .serve(sacp_test::MockTransport).await?;
1340/// # Ok(())
1341/// # }
1342/// ```
1343///
1344/// # Event Loop Considerations
1345///
1346/// Like all handlers, request handlers run on the event loop. Use
1347/// [`spawn`](JrConnectionCx::spawn) for expensive operations to avoid blocking
1348/// the connection.
1349///
1350/// See the [Event Loop and Concurrency](JrConnection#event-loop-and-concurrency)
1351/// section for more details.
1352#[must_use]
1353pub struct JrRequestCx<T: JrResponsePayload> {
1354 /// The context to use to send outgoing messages and replies.
1355 cx: JrConnectionCx,
1356
1357 /// The method of the request.
1358 method: String,
1359
1360 /// The `id` of the message we are replying to.
1361 id: jsonrpcmsg::Id,
1362
1363 /// Function to send the response `T` to a request with the given method and id.
1364 make_json: SendBoxFnOnce<
1365 'static,
1366 (String, Result<T, crate::Error>),
1367 Result<serde_json::Value, crate::Error>,
1368 >,
1369}
1370
1371impl<T: JrResponsePayload> std::fmt::Debug for JrRequestCx<T> {
1372 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1373 f.debug_struct("JrRequestCx")
1374 .field("cx", &self.cx)
1375 .field("method", &self.method)
1376 .field("id", &self.id)
1377 .field("response_type", &std::any::type_name::<T>())
1378 .finish()
1379 }
1380}
1381
1382impl JrRequestCx<serde_json::Value> {
1383 /// Create a new method context.
1384 fn new(cx: &JrConnectionCx, method: String, id: jsonrpcmsg::Id) -> Self {
1385 Self {
1386 cx: cx.clone(),
1387 method,
1388 id,
1389 make_json: SendBoxFnOnce::new(move |_method, value| value),
1390 }
1391 }
1392
1393 /// Cast this request context to a different response type
1394 pub fn cast<T: JrResponsePayload>(self) -> JrRequestCx<T> {
1395 self.wrap_params(move |method, value| match value {
1396 Ok(value) => T::into_json(value, &method),
1397 Err(e) => Err(e),
1398 })
1399 }
1400}
1401
1402impl<T: JrResponsePayload> JrRequestCx<T> {
1403 /// Method of the incoming request
1404 pub fn method(&self) -> &str {
1405 &self.method
1406 }
1407
1408 /// ID of the incoming request as a JSON value
1409 pub fn id(&self) -> serde_json::Value {
1410 match &self.id {
1411 jsonrpcmsg::Id::Number(n) => serde_json::Value::Number((*n).into()),
1412 jsonrpcmsg::Id::String(s) => serde_json::Value::String(s.clone()),
1413 jsonrpcmsg::Id::Null => serde_json::Value::Null,
1414 }
1415 }
1416
1417 /// Convert to a `JrRequestCx` that expects a JSON value
1418 /// and which checks (dynamically) that the JSON value it receives
1419 /// can be converted to `T`.
1420 pub fn erase_to_json(self) -> JrRequestCx<serde_json::Value> {
1421 self.wrap_params(|method, value| T::from_value(&method, value?))
1422 }
1423
1424 /// Return a new JrResponse that expects a response of type U and serializes it.
1425 pub fn wrap_method(self, method: String) -> JrRequestCx<T> {
1426 JrRequestCx {
1427 cx: self.cx,
1428 method,
1429 id: self.id,
1430 make_json: self.make_json,
1431 }
1432 }
1433
1434 /// Return a new JrResponse that expects a response of type U and serializes it.
1435 ///
1436 /// `wrap_fn` will be invoked with the method name and the result of the wrapped function.
1437 pub fn wrap_params<U: JrResponsePayload>(
1438 self,
1439 wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
1440 ) -> JrRequestCx<U> {
1441 JrRequestCx {
1442 cx: self.cx,
1443 method: self.method,
1444 id: self.id,
1445 make_json: SendBoxFnOnce::new(move |method: String, input: Result<U, crate::Error>| {
1446 let t_value = wrap_fn(&method, input);
1447 self.make_json.call(method, t_value)
1448 }),
1449 }
1450 }
1451
1452 /// Get the underlying JSON RPC context.
1453 pub fn connection_cx(&self) -> JrConnectionCx {
1454 self.cx.clone()
1455 }
1456
1457 /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
1458 pub fn respond_with_result(
1459 self,
1460 response: Result<T, crate::Error>,
1461 ) -> Result<(), crate::Error> {
1462 tracing::debug!(id = ?self.id, "respond called");
1463 let json = self.make_json.call_tuple((self.method.clone(), response));
1464 self.cx.send_raw_message(OutgoingMessage::Response {
1465 id: self.id,
1466 response: json,
1467 })
1468 }
1469
1470 /// Respond to the JSON-RPC request with a value.
1471 pub fn respond(self, response: T) -> Result<(), crate::Error> {
1472 self.respond_with_result(Ok(response))
1473 }
1474
1475 /// Respond to the JSON-RPC request with an internal error containing a message.
1476 pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
1477 self.respond_with_error(crate::util::internal_error(message))
1478 }
1479
1480 /// Respond to the JSON-RPC request with an error.
1481 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
1482 tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
1483 self.respond_with_result(Err(error))
1484 }
1485}
1486
1487/// Common bounds for any JSON-RPC message.
1488pub trait JrMessage: 'static + Debug + Sized + Send + Clone {
1489 /// The parameters for the request.
1490 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
1491
1492 /// The method name for the request.
1493 fn method(&self) -> &str;
1494
1495 /// Attempt to parse this type from a JSON-RPC request.
1496 ///
1497 /// Returns:
1498 /// - `None` if this type does not recognize the method name or recognizes it as a notification
1499 /// - `Some(Ok(value))` if the method is recognized as a request and deserialization succeeds
1500 /// - `Some(Err(error))` if the method is recognized as a request but deserialization fails
1501 fn parse_request(_method: &str, _params: &impl Serialize)
1502 -> Option<Result<Self, crate::Error>>;
1503
1504 /// Attempt to parse this type from a JSON-RPC notification.
1505 ///
1506 /// Returns:
1507 /// - `None` if this type does not recognize the method name or recognizes it as a request
1508 /// - `Some(Ok(value))` if the method is recognized as a notification and deserialization succeeds
1509 /// - `Some(Err(error))` if the method is recognized as a notification but deserialization fails
1510 fn parse_notification(
1511 _method: &str,
1512 _params: &impl Serialize,
1513 ) -> Option<Result<Self, crate::Error>>;
1514}
1515
1516/// Defines the "payload" of a successful response to a JSON-RPC request.
1517pub trait JrResponsePayload: 'static + Debug + Sized + Send {
1518 /// Convert this message into a JSON value.
1519 fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
1520
1521 /// Parse a JSON value into the response type.
1522 fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
1523}
1524
1525impl JrResponsePayload for serde_json::Value {
1526 fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
1527 Ok(value)
1528 }
1529
1530 fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
1531 Ok(self)
1532 }
1533}
1534
1535/// A struct that represents a notification (JSON-RPC message that does not expect a response).
1536pub trait JrNotification: JrMessage {}
1537
1538/// A struct that represents a request (JSON-RPC message expecting a response).
1539pub trait JrRequest: JrMessage {
1540 /// The type of data expected in response.
1541 type Response: JrResponsePayload;
1542}
1543
1544/// An enum capturing an in-flight request or notification.
1545/// In the case of a request, also includes the context used to respond to the request.
1546///
1547/// Type parameters allow specifying the concrete request and notification types.
1548/// By default, both are `UntypedMessage` for dynamic dispatch.
1549/// The request context's response type matches the request's response type.
1550#[derive(Debug)]
1551pub enum MessageAndCx<R: JrRequest = UntypedMessage, N: JrMessage = UntypedMessage> {
1552 /// Incoming request and the context where the response should be sent.
1553 Request(R, JrRequestCx<R::Response>),
1554
1555 /// Incoming notification.
1556 Notification(N, JrConnectionCx),
1557}
1558
1559impl<R: JrRequest, N: JrMessage> MessageAndCx<R, N> {
1560 /// Map the request and notification types to new types.
1561 pub fn map<R1, N1>(
1562 self,
1563 map_request: impl FnOnce(R, JrRequestCx<R::Response>) -> (R1, JrRequestCx<R1::Response>),
1564 map_notification: impl FnOnce(N, JrConnectionCx) -> (N1, JrConnectionCx),
1565 ) -> MessageAndCx<R1, N1>
1566 where
1567 R1: JrRequest<Response: Send>,
1568 N1: JrMessage,
1569 {
1570 match self {
1571 MessageAndCx::Request(request, cx) => {
1572 let (new_request, new_cx) = map_request(request, cx);
1573 MessageAndCx::Request(new_request, new_cx)
1574 }
1575 MessageAndCx::Notification(notification, cx) => {
1576 let (new_notification, new_cx) = map_notification(notification, cx);
1577 MessageAndCx::Notification(new_notification, new_cx)
1578 }
1579 }
1580 }
1581
1582 /// Respond to the message with an error.
1583 ///
1584 /// If this message is a request, this error becomes the reply to the request.
1585 ///
1586 /// If this message is a notification, the error is sent as a notification.
1587 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
1588 match self {
1589 MessageAndCx::Request(_, cx) => cx.respond_with_error(error),
1590 MessageAndCx::Notification(_, cx) => cx.send_error_notification(error),
1591 }
1592 }
1593
1594 /// Convert to a `JrRequestCx` that expects a JSON value
1595 /// and which checks (dynamically) that the JSON value it receives
1596 /// can be converted to `T`.
1597 pub fn erase_to_json(self) -> Result<MessageAndCx, crate::Error> {
1598 match self {
1599 MessageAndCx::Request(response, request_cx) => Ok(MessageAndCx::Request(
1600 response.to_untyped_message()?,
1601 request_cx.erase_to_json(),
1602 )),
1603 MessageAndCx::Notification(notification, cx) => Ok(MessageAndCx::Notification(
1604 notification.to_untyped_message()?,
1605 cx,
1606 )),
1607 }
1608 }
1609
1610 /// Convert the message to an untyped message.
1611 pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
1612 match self {
1613 MessageAndCx::Request(request, _) => request.to_untyped_message(),
1614 MessageAndCx::Notification(notification, _) => notification.to_untyped_message(),
1615 }
1616 }
1617
1618 /// Returns the request ID if this is a request, None if notification.
1619 pub fn id(&self) -> Option<serde_json::Value> {
1620 match self {
1621 MessageAndCx::Request(_, cx) => Some(cx.id()),
1622 MessageAndCx::Notification(_, _) => None,
1623 }
1624 }
1625}
1626
1627impl MessageAndCx {
1628 /// Returns the method of the message (only available for UntypedMessage).
1629 pub fn method(&self) -> &str {
1630 match self {
1631 MessageAndCx::Request(msg, _) => &msg.method,
1632 MessageAndCx::Notification(msg, _) => &msg.method,
1633 }
1634 }
1635
1636 /// Returns the message of the message (only available for UntypedMessage).
1637 pub fn message(&self) -> &UntypedMessage {
1638 match self {
1639 MessageAndCx::Request(msg, _) => msg,
1640 MessageAndCx::Notification(msg, _) => msg,
1641 }
1642 }
1643}
1644
1645/// An incoming JSON message without any typing. Can be a request or a notification.
1646#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1647pub struct UntypedMessage {
1648 /// The JSON-RPC method name
1649 pub method: String,
1650 /// The JSON-RPC parameters as a raw JSON value
1651 pub params: serde_json::Value,
1652}
1653
1654impl UntypedMessage {
1655 /// Returns an untyped message with the given method and parameters.
1656 pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
1657 let params = serde_json::to_value(params)?;
1658 Ok(Self {
1659 method: method.to_string(),
1660 params,
1661 })
1662 }
1663
1664 /// Returns the method name
1665 pub fn method(&self) -> &str {
1666 &self.method
1667 }
1668
1669 /// Returns the parameters as a JSON value
1670 pub fn params(&self) -> &serde_json::Value {
1671 &self.params
1672 }
1673
1674 /// Consumes this message and returns the method and params
1675 pub fn into_parts(self) -> (String, serde_json::Value) {
1676 (self.method, self.params)
1677 }
1678}
1679
1680impl JrMessage for UntypedMessage {
1681 fn method(&self) -> &str {
1682 &self.method
1683 }
1684
1685 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
1686 Ok(self.clone())
1687 }
1688
1689 fn parse_request(method: &str, params: &impl Serialize) -> Option<Result<Self, crate::Error>> {
1690 Some(UntypedMessage::new(method, params))
1691 }
1692
1693 fn parse_notification(
1694 method: &str,
1695 params: &impl Serialize,
1696 ) -> Option<Result<Self, crate::Error>> {
1697 Some(UntypedMessage::new(method, params))
1698 }
1699}
1700
1701impl JrRequest for UntypedMessage {
1702 type Response = serde_json::Value;
1703}
1704
1705impl JrNotification for UntypedMessage {}
1706
1707/// Represents a pending response of type `R` from an outgoing request.
1708///
1709/// Returned by [`JrConnectionCx::send_request`], this type provides methods for handling
1710/// the response without blocking the event loop. The API is intentionally designed to make
1711/// it difficult to accidentally block.
1712///
1713/// # Anti-Footgun Design
1714///
1715/// You cannot directly `.await` a `JrResponse`. Instead, you must choose how to handle
1716/// the response:
1717///
1718/// ## Option 1: Schedule a Callback (Safe in Handlers)
1719///
1720/// Use [`await_when_result_received`](Self::await_when_result_received) to schedule a task
1721/// that runs when the response arrives. This doesn't block the event loop:
1722///
1723/// ```no_run
1724/// # use sacp_test::*;
1725/// # async fn example(cx: sacp::JrConnectionCx) -> Result<(), sacp::Error> {
1726/// cx.send_request(MyRequest {})
1727/// .await_when_result_received(async |result| {
1728/// match result {
1729/// Ok(response) => {
1730/// // Handle successful response
1731/// Ok(())
1732/// }
1733/// Err(error) => {
1734/// // Handle error
1735/// Err(error)
1736/// }
1737/// }
1738/// })?;
1739/// # Ok(())
1740/// # }
1741/// ```
1742///
1743/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
1744///
1745/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
1746/// in a spawned task (never in a handler):
1747///
1748/// ```no_run
1749/// # use sacp_test::*;
1750/// # async fn example(cx: sacp::JrConnectionCx) -> Result<(), sacp::Error> {
1751/// // ✅ Safe: Spawned task runs concurrently
1752/// cx.spawn({
1753/// let cx = cx.clone();
1754/// async move {
1755/// let response = cx.send_request(MyRequest {})
1756/// .block_task()
1757/// .await?;
1758/// // Process response...
1759/// Ok(())
1760/// }
1761/// })?;
1762/// # Ok(())
1763/// # }
1764/// ```
1765///
1766/// ```no_run
1767/// # use sacp_test::*;
1768/// # async fn example() -> Result<(), sacp::Error> {
1769/// # let connection = mock_connection();
1770/// // ❌ NEVER do this in a handler - blocks the event loop!
1771/// connection.on_receive_request(async |req: MyRequest, cx| {
1772/// let response = cx.connection_cx().send_request(MyRequest {})
1773/// .block_task() // This will deadlock!
1774/// .await?;
1775/// cx.respond(response)
1776/// })
1777/// # .serve(sacp_test::MockTransport).await?;
1778/// # Ok(())
1779/// # }
1780/// ```
1781///
1782/// # Why This Design?
1783///
1784/// If you block the event loop while waiting for a response, the connection cannot process
1785/// the incoming response message, creating a deadlock. This API design prevents that footgun
1786/// by making blocking explicit and encouraging non-blocking patterns.
1787pub struct JrResponse<R> {
1788 method: String,
1789 connection_cx: JrConnectionCx,
1790 response_rx: oneshot::Receiver<Result<serde_json::Value, crate::Error>>,
1791 to_result: Box<dyn Fn(serde_json::Value) -> Result<R, crate::Error> + Send>,
1792}
1793
1794impl JrResponse<serde_json::Value> {
1795 fn new(
1796 method: String,
1797 connection_cx: JrConnectionCx,
1798 response_rx: oneshot::Receiver<Result<serde_json::Value, crate::Error>>,
1799 ) -> Self {
1800 Self {
1801 method,
1802 response_rx,
1803 connection_cx,
1804 to_result: Box::new(Ok),
1805 }
1806 }
1807}
1808
1809impl<R: JrResponsePayload> JrResponse<R> {
1810 /// The method of the request this is in response to.
1811 pub fn method(&self) -> &str {
1812 &self.method
1813 }
1814
1815 /// Create a new response that maps the result of the response to a new type.
1816 pub fn map<U>(
1817 self,
1818 map_fn: impl Fn(R) -> Result<U, crate::Error> + 'static + Send,
1819 ) -> JrResponse<U>
1820 where
1821 U: JrResponsePayload,
1822 {
1823 JrResponse {
1824 method: self.method,
1825 response_rx: self.response_rx,
1826 connection_cx: self.connection_cx,
1827 to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
1828 }
1829 }
1830
1831 /// Forward the response (success or error) to a request context when it arrives.
1832 ///
1833 /// This is a convenience method for proxying messages between connections. When the
1834 /// response arrives, it will be automatically sent to the provided request context,
1835 /// whether it's a successful response or an error.
1836 ///
1837 /// # Example: Proxying requests
1838 ///
1839 /// ```
1840 /// # use sacp::{JrHandlerChain, JrConnectionCx};
1841 /// # use sacp_test::*;
1842 /// # async fn example(cx: JrConnectionCx) -> Result<(), sacp::Error> {
1843 /// // Set up backend connection
1844 /// let backend = JrHandlerChain::new()
1845 /// .on_receive_request(async |req: MyRequest, request_cx| {
1846 /// request_cx.respond(MyResponse { status: "ok".into() })
1847 /// })
1848 /// .connect_to(MockTransport)?;
1849 ///
1850 /// // Spawn backend and get a context to send to it
1851 /// let backend_cx = cx.spawn_connection(backend, |c| Box::pin(c.serve()))?;
1852 ///
1853 /// // Set up proxy that forwards requests to backend
1854 /// JrHandlerChain::new()
1855 /// .on_receive_request({
1856 /// let backend_cx = backend_cx.clone();
1857 /// async move |req: MyRequest, request_cx| {
1858 /// // Forward the request to backend and proxy the response back
1859 /// backend_cx.send_request(req)
1860 /// .forward_to_request_cx(request_cx)?;
1861 /// Ok(())
1862 /// }
1863 /// });
1864 /// # Ok(())
1865 /// # }
1866 /// ```
1867 ///
1868 /// # Type Safety
1869 ///
1870 /// The request context's response type must match the request's response type,
1871 /// ensuring type-safe message forwarding.
1872 ///
1873 /// # When to Use
1874 ///
1875 /// Use this when:
1876 /// - You're implementing a proxy or gateway pattern
1877 /// - You want to forward responses without processing them
1878 /// - The response types match between the outgoing request and incoming request
1879 ///
1880 /// This is equivalent to calling `await_when_result_received` and manually forwarding
1881 /// the result, but more concise.
1882 pub fn forward_to_request_cx(self, request_cx: JrRequestCx<R>) -> Result<(), crate::Error>
1883 where
1884 R: Send,
1885 {
1886 self.await_when_result_received(async move |result| request_cx.respond_with_result(result))
1887 }
1888
1889 /// Block the current task until the response is received.
1890 ///
1891 /// **Warning:** This method blocks the current async task. It is **only safe** to use
1892 /// in spawned tasks created with [`JrConnectionCx::spawn`]. Using it directly in a
1893 /// handler callback will deadlock the connection.
1894 ///
1895 /// # Safe Usage (in spawned tasks)
1896 ///
1897 /// ```no_run
1898 /// # use sacp_test::*;
1899 /// # async fn example() -> Result<(), sacp::Error> {
1900 /// # let connection = mock_connection();
1901 /// connection.on_receive_request(async |req: MyRequest, cx| {
1902 /// // Spawn a task to handle the request
1903 /// cx.connection_cx().spawn({
1904 /// let connection_cx = cx.connection_cx();
1905 /// async move {
1906 /// // Safe: We're in a spawned task, not blocking the event loop
1907 /// let response = connection_cx.send_request(OtherRequest {})
1908 /// .block_task()
1909 /// .await?;
1910 ///
1911 /// // Process the response...
1912 /// Ok(())
1913 /// }
1914 /// })?;
1915 ///
1916 /// // Respond immediately
1917 /// cx.respond(MyResponse { status: "ok".into() })
1918 /// })
1919 /// # .serve(sacp_test::MockTransport).await?;
1920 /// # Ok(())
1921 /// # }
1922 /// ```
1923 ///
1924 /// # Unsafe Usage (in handlers - will deadlock!)
1925 ///
1926 /// ```no_run
1927 /// # use sacp_test::*;
1928 /// # async fn example() -> Result<(), sacp::Error> {
1929 /// # let connection = mock_connection();
1930 /// connection.on_receive_request(async |req: MyRequest, cx| {
1931 /// // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
1932 /// let response = cx.connection_cx().send_request(OtherRequest {})
1933 /// .block_task()
1934 /// .await?;
1935 ///
1936 /// cx.respond(MyResponse { status: response.value })
1937 /// })
1938 /// # .serve(sacp_test::MockTransport).await?;
1939 /// # Ok(())
1940 /// # }
1941 /// ```
1942 ///
1943 /// # When to Use
1944 ///
1945 /// Use this method when:
1946 /// - You're in a spawned task (via [`JrConnectionCx::spawn`])
1947 /// - You need the response value to proceed with your logic
1948 /// - Linear control flow is more natural than callbacks
1949 ///
1950 /// For handler callbacks, use [`await_when_result_received`](Self::await_when_result_received) instead.
1951 pub async fn block_task(self) -> Result<R, crate::Error>
1952 where
1953 R: Send,
1954 {
1955 match self.response_rx.await {
1956 Ok(Ok(json_value)) => match (self.to_result)(json_value) {
1957 Ok(value) => Ok(value),
1958 Err(err) => Err(err),
1959 },
1960 Ok(Err(err)) => Err(err),
1961 Err(err) => Err(crate::util::internal_error(format!(
1962 "response to `{}` never received: {}",
1963 self.method, err
1964 ))),
1965 }
1966 }
1967
1968 /// Schedule an async task to run when a successful response is received.
1969 ///
1970 /// This is a convenience wrapper around [`await_when_result_received`](Self::await_when_result_received)
1971 /// for the common pattern of forwarding errors to a request context while only processing
1972 /// successful responses.
1973 ///
1974 /// # Behavior
1975 ///
1976 /// - If the response is `Ok(value)`, your task receives the value and the request context
1977 /// - If the response is `Err(error)`, the error is automatically sent to `request_cx`
1978 /// and your task is not called
1979 ///
1980 /// # Example: Chaining requests
1981 ///
1982 /// ```no_run
1983 /// # use sacp_test::*;
1984 /// # async fn example() -> Result<(), sacp::Error> {
1985 /// # let connection = mock_connection();
1986 /// connection.on_receive_request(async |req: ValidateRequest, request_cx| {
1987 /// // Send initial request
1988 /// request_cx.connection_cx().send_request(ValidateRequest { data: req.data.clone() })
1989 /// .await_when_ok_response_received(request_cx, async |validation, request_cx| {
1990 /// // Only runs if validation succeeded
1991 /// if validation.is_valid {
1992 /// // Respond to original request
1993 /// request_cx.respond(ValidateResponse { is_valid: true, error: None })
1994 /// } else {
1995 /// request_cx.respond_with_error(sacp::util::internal_error("validation failed"))
1996 /// }
1997 /// })?;
1998 ///
1999 /// Ok(())
2000 /// })
2001 /// # .serve(sacp_test::MockTransport).await?;
2002 /// # Ok(())
2003 /// # }
2004 /// ```
2005 ///
2006 /// # When to Use
2007 ///
2008 /// Use this when:
2009 /// - You need to respond to a request based on another request's result
2010 /// - You want errors to automatically propagate to the request context
2011 /// - You only care about the success case
2012 ///
2013 /// For more control over error handling, use [`await_when_result_received`](Self::await_when_result_received).
2014 #[track_caller]
2015 pub fn await_when_ok_response_received<F>(
2016 self,
2017 request_cx: JrRequestCx<R>,
2018 task: impl FnOnce(R, JrRequestCx<R>) -> F + 'static + Send,
2019 ) -> Result<(), crate::Error>
2020 where
2021 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
2022 R: Send,
2023 {
2024 self.await_when_result_received(async move |result| match result {
2025 Ok(value) => task(value, request_cx).await,
2026 Err(err) => request_cx.respond_with_error(err),
2027 })
2028 }
2029
2030 /// Schedule an async task to run when the response is received.
2031 ///
2032 /// This is the recommended way to handle responses in handler callbacks, as it doesn't
2033 /// block the event loop. The task will be spawned automatically when the response arrives.
2034 ///
2035 /// # Example: Handle response in callback
2036 ///
2037 /// ```no_run
2038 /// # use sacp_test::*;
2039 /// # async fn example() -> Result<(), sacp::Error> {
2040 /// # let connection = mock_connection();
2041 /// connection.on_receive_request(async |req: MyRequest, cx| {
2042 /// // Send a request and schedule a callback for the response
2043 /// cx.connection_cx().send_request(QueryRequest { id: 22 })
2044 /// .await_when_result_received({
2045 /// let connection_cx = cx.connection_cx();
2046 /// async move |result| {
2047 /// match result {
2048 /// Ok(response) => {
2049 /// println!("Got response: {:?}", response);
2050 /// // Can send more messages here
2051 /// connection_cx.send_notification(QueryComplete {})?;
2052 /// Ok(())
2053 /// }
2054 /// Err(error) => {
2055 /// eprintln!("Request failed: {}", error);
2056 /// Err(error)
2057 /// }
2058 /// }
2059 /// }
2060 /// })?;
2061 ///
2062 /// // Handler continues immediately without waiting
2063 /// cx.respond(MyResponse { status: "processing".into() })
2064 /// })
2065 /// # .serve(sacp_test::MockTransport).await?;
2066 /// # Ok(())
2067 /// # }
2068 /// ```
2069 ///
2070 /// # Event Loop Safety
2071 ///
2072 /// Unlike [`block_task`](Self::block_task), this method is safe to use in handlers because
2073 /// it schedules the task to run later rather than blocking the current task. The event loop
2074 /// remains free to process messages, including the response itself.
2075 ///
2076 /// # Error Handling
2077 ///
2078 /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
2079 /// errors appropriately within your task.
2080 ///
2081 /// # When to Use
2082 ///
2083 /// Use this method when:
2084 /// - You're in a handler callback (not a spawned task)
2085 /// - You want to process the response asynchronously
2086 /// - You don't need the response value immediately
2087 ///
2088 /// For spawned tasks where you need linear control flow, consider [`block_task`](Self::block_task).
2089 #[track_caller]
2090 pub fn await_when_result_received<F>(
2091 self,
2092 task: impl FnOnce(Result<R, crate::Error>) -> F + 'static + Send,
2093 ) -> Result<(), crate::Error>
2094 where
2095 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
2096 R: Send,
2097 {
2098 let connection_cx = self.connection_cx.clone();
2099 let block_task = self.block_task();
2100 connection_cx.spawn(async move { task(block_task.await).await })
2101 }
2102}
2103
2104const COMMUNICATION_FAILURE: i32 = -32000;
2105
2106fn communication_failure(err: impl ToString) -> crate::Error {
2107 crate::Error::new((COMMUNICATION_FAILURE, err.to_string()))
2108}
2109
2110// ============================================================================
2111// IntoJrConnectionTransport Implementations
2112// ============================================================================
2113
2114/// A component that communicates over line streams.
2115///
2116/// `Lines` implements the [`Component`] trait for any pair of line-based streams
2117/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
2118/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
2119///
2120/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
2121/// transformation of individual lines before they are parsed or after they are serialized.
2122/// This is particularly useful for debugging, logging, or implementing custom line-based
2123/// protocols.
2124///
2125/// # Use Cases
2126///
2127/// - **Line-by-line logging**: Intercept and log each line before parsing
2128/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
2129/// - **Debugging**: Inspect raw message strings
2130/// - **Line filtering**: Skip or modify specific messages
2131///
2132/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
2133/// for byte-based I/O.
2134///
2135/// [`Component`]: crate::Component
2136pub struct Lines<OutgoingSink, IncomingStream> {
2137 /// Outgoing line sink (where we write serialized JSON-RPC messages)
2138 pub outgoing: OutgoingSink,
2139 /// Incoming line stream (where we read and parse JSON-RPC messages)
2140 pub incoming: IncomingStream,
2141}
2142
2143impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
2144where
2145 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
2146 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
2147{
2148 /// Create a new line stream transport.
2149 pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
2150 Self { outgoing, incoming }
2151 }
2152}
2153
2154impl<OutgoingSink, IncomingStream> Component for Lines<OutgoingSink, IncomingStream>
2155where
2156 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
2157 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
2158{
2159 async fn serve(self, client: impl Component) -> Result<(), crate::Error> {
2160 let (channel, serve_self) = self.into_server();
2161 match futures::future::select(Box::pin(client.serve(channel)), serve_self).await {
2162 Either::Left((result, _)) => result,
2163 Either::Right((result, _)) => result,
2164 }
2165 }
2166
2167 fn into_server(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
2168 let Self { outgoing, incoming } = self;
2169
2170 // Create a channel pair for the client to use
2171 let (channel_for_caller, channel_for_lines) = Channel::duplex();
2172
2173 // Create the server future that runs the line stream actors
2174 let server_future = Box::pin(async move {
2175 let Channel { rx, tx } = channel_for_lines;
2176
2177 // Run both actors concurrently
2178 let outgoing_future = actors::transport_outgoing_lines_actor(rx, outgoing);
2179 let incoming_future = actors::transport_incoming_lines_actor(incoming, tx);
2180
2181 // Wait for both to complete
2182 futures::try_join!(outgoing_future, incoming_future)?;
2183
2184 Ok(())
2185 });
2186
2187 (channel_for_caller, server_future)
2188 }
2189}
2190
2191/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
2192///
2193/// `ByteStreams` implements the [`Component`] trait for any pair of `AsyncRead` and `AsyncWrite`
2194/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
2195/// This is the standard way to communicate with external processes or network connections.
2196///
2197/// # Use Cases
2198///
2199/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
2200/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
2201/// - **Named pipes**: Cross-process communication on the same machine
2202/// - **File I/O**: Reading from and writing to file descriptors
2203///
2204/// # Example
2205///
2206/// Connecting to an agent via stdio:
2207///
2208/// ```no_run
2209/// use sacp::ByteStreams;
2210/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
2211///
2212/// # async fn example() -> Result<(), sacp::Error> {
2213/// let component = ByteStreams::new(
2214/// tokio::io::stdout().compat_write(),
2215/// tokio::io::stdin().compat(),
2216/// );
2217///
2218/// // Use as a component in a handler chain
2219/// sacp::JrHandlerChain::new()
2220/// .name("my-client")
2221/// .serve(component)
2222/// .await?;
2223/// # Ok(())
2224/// # }
2225/// ```
2226///
2227/// [`Component`]: crate::Component
2228pub struct ByteStreams<OB, IB> {
2229 /// Outgoing byte stream (where we write serialized messages)
2230 pub outgoing: OB,
2231 /// Incoming byte stream (where we read and parse messages)
2232 pub incoming: IB,
2233}
2234
2235impl<OB, IB> ByteStreams<OB, IB>
2236where
2237 OB: AsyncWrite + Send + 'static,
2238 IB: AsyncRead + Send + 'static,
2239{
2240 /// Create a new byte stream transport.
2241 pub fn new(outgoing: OB, incoming: IB) -> Self {
2242 Self { outgoing, incoming }
2243 }
2244}
2245
2246impl<OB, IB> Component for ByteStreams<OB, IB>
2247where
2248 OB: AsyncWrite + Send + 'static,
2249 IB: AsyncRead + Send + 'static,
2250{
2251 async fn serve(self, client: impl Component) -> Result<(), crate::Error> {
2252 let (channel, serve_self) = self.into_server();
2253 match futures::future::select(pin!(client.serve(channel)), serve_self).await {
2254 Either::Left((result, _)) => result,
2255 Either::Right((result, _)) => result,
2256 }
2257 }
2258
2259 fn into_server(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
2260 use futures::AsyncBufReadExt;
2261 use futures::AsyncWriteExt;
2262 use futures::io::BufReader;
2263 let Self { outgoing, incoming } = self;
2264
2265 // Convert byte streams to line streams
2266 // Box both streams to satisfy Unpin requirements
2267 let incoming_lines = Box::pin(BufReader::new(incoming).lines());
2268
2269 // Create a sink that writes lines (with newlines) to the outgoing byte stream
2270 // We need to Box the writer since it may not be Unpin
2271 let outgoing_sink =
2272 futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
2273 let mut bytes = line.into_bytes();
2274 bytes.push(b'\n');
2275 writer.write_all(&bytes).await?;
2276 Ok::<_, std::io::Error>(writer)
2277 });
2278
2279 // Delegate to Lines component
2280 Lines::new(outgoing_sink, incoming_lines).into_server()
2281 }
2282}
2283
2284/// A channel endpoint representing one side of a bidirectional message channel.
2285///
2286/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
2287/// Each endpoint has:
2288/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
2289/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
2290///
2291/// # Example
2292///
2293/// ```no_run
2294/// # use sacp::{Channel, JrHandlerChain};
2295/// # async fn example() -> Result<(), sacp::Error> {
2296/// // Create a pair of connected channels
2297/// let (channel_a, channel_b) = Channel::duplex();
2298///
2299/// // Each channel can be used by a different component
2300/// JrHandlerChain::new()
2301/// .name("connection-a")
2302/// .serve(channel_a)
2303/// .await?;
2304/// # Ok(())
2305/// # }
2306/// ```
2307pub struct Channel {
2308 /// Receives messages (or errors) from the counterpart.
2309 pub rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
2310 /// Sends messages (or errors) to the counterpart.
2311 pub tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
2312}
2313
2314impl Channel {
2315 /// Create a pair of connected channel endpoints.
2316 ///
2317 /// Returns two `Channel` instances that are connected to each other:
2318 /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
2319 /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
2320 ///
2321 /// # Returns
2322 ///
2323 /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
2324 pub fn duplex() -> (Self, Self) {
2325 // Create channels: A sends Result<Message> which B receives as Message
2326 let (a_tx, b_rx) = mpsc::unbounded();
2327 let (b_tx, a_rx) = mpsc::unbounded();
2328
2329 let channel_a = Self { rx: a_rx, tx: a_tx };
2330 let channel_b = Self { rx: b_rx, tx: b_tx };
2331
2332 (channel_a, channel_b)
2333 }
2334
2335 /// Copy messages from `rx` to `tx`.
2336 ///
2337 /// # Returns
2338 ///
2339 /// A `Result` indicating success or failure.
2340 pub async fn copy(mut self) -> Result<(), crate::Error> {
2341 while let Some(msg) = self.rx.next().await {
2342 self.tx
2343 .unbounded_send(msg)
2344 .map_err(crate::util::internal_error)?;
2345 }
2346 Ok(())
2347 }
2348}
2349
2350impl Component for Channel {
2351 async fn serve(self, client: impl Component) -> Result<(), crate::Error> {
2352 let (client_channel, client_serve) = client.into_server();
2353
2354 match futures::try_join!(
2355 Channel {
2356 rx: client_channel.rx,
2357 tx: self.tx
2358 }
2359 .copy(),
2360 Channel {
2361 rx: self.rx,
2362 tx: client_channel.tx
2363 }
2364 .copy(),
2365 client_serve
2366 ) {
2367 Ok(((), (), ())) => Ok(()),
2368 Err(err) => Err(err),
2369 }
2370 }
2371
2372 fn into_server(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
2373 (self, Box::pin(future::ready(Ok(()))))
2374 }
2375}