sacp/jsonrpc.rs
1//! Core JSON-RPC server support.
2
3use agent_client_protocol_schema::SessionId;
4// Re-export jsonrpcmsg for use in public API
5pub use jsonrpcmsg;
6
7// Types re-exported from crate root
8use serde::{Deserialize, Serialize};
9use std::fmt::Debug;
10use std::panic::Location;
11use std::pin::pin;
12use uuid::Uuid;
13
14use boxfnonce::SendBoxFnOnce;
15use futures::channel::{mpsc, oneshot};
16use futures::future::{self, BoxFuture, Either};
17use futures::{AsyncRead, AsyncWrite, StreamExt};
18
19mod dynamic_handler;
20pub(crate) mod handlers;
21mod incoming_actor;
22mod outgoing_actor;
23pub(crate) mod responder;
24mod task_actor;
25mod transport_actor;
26
27use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
28pub use crate::jsonrpc::handlers::NullHandler;
29use crate::jsonrpc::handlers::{ChainedHandler, NamedHandler};
30use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, RequestHandler};
31use crate::jsonrpc::outgoing_actor::{OutgoingMessageTx, send_raw_message};
32use crate::jsonrpc::responder::SpawnedResponder;
33use crate::jsonrpc::responder::{ChainResponder, JrResponder, NullResponder};
34use crate::jsonrpc::task_actor::{Task, TaskTx};
35use crate::link::{HasDefaultPeer, HasPeer, JrLink};
36use crate::mcp_server::McpServer;
37use crate::peer::JrPeer;
38use crate::{AgentPeer, ClientPeer, Component};
39
40/// Handlers process incoming JSON-RPC messages on a [`JrConnection`].
41///
42/// When messages arrive, they flow through a chain of handlers. Each handler can
43/// either **claim** the message (handle it) or **decline** it (pass to the next handler).
44///
45/// # Message Flow
46///
47/// Messages flow through three layers of handlers in order:
48///
49/// ```text
50/// ┌─────────────────────────────────────────────────────────────────┐
51/// │ Incoming Message │
52/// └─────────────────────────────────────────────────────────────────┘
53/// │
54/// ▼
55/// ┌─────────────────────────────────────────────────────────────────┐
56/// │ 1. User Handlers (registered via on_receive_request, etc.) │
57/// │ - Tried in registration order │
58/// │ - First handler to return Handled::Yes claims the message │
59/// └─────────────────────────────────────────────────────────────────┘
60/// │ Handled::No
61/// ▼
62/// ┌─────────────────────────────────────────────────────────────────┐
63/// │ 2. Dynamic Handlers (added at runtime) │
64/// │ - Used for session-specific message handling │
65/// │ - Added via JrConnectionCx::add_dynamic_handler │
66/// └─────────────────────────────────────────────────────────────────┘
67/// │ Handled::No
68/// ▼
69/// ┌─────────────────────────────────────────────────────────────────┐
70/// │ 3. Role Default Handler │
71/// │ - Fallback based on the connection's JrLink │
72/// │ - Handles protocol-level messages (e.g., proxy forwarding) │
73/// └─────────────────────────────────────────────────────────────────┘
74/// │ Handled::No
75/// ▼
76/// ┌─────────────────────────────────────────────────────────────────┐
77/// │ Unhandled: Error response sent (or queued if retry=true) │
78/// └─────────────────────────────────────────────────────────────────┘
79/// ```
80///
81/// # The `Handled` Return Value
82///
83/// Each handler returns [`Handled`] to indicate whether it processed the message:
84///
85/// - **`Handled::Yes`** - Message was handled. No further handlers are invoked.
86/// - **`Handled::No { message, retry }`** - Message was not handled. The message
87/// (possibly modified) is passed to the next handler in the chain.
88///
89/// For convenience, handlers can return `()` which is equivalent to `Handled::Yes`.
90///
91/// # The Retry Mechanism
92///
93/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
94///
95/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
96/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
97///
98/// This mechanism exists because of a timing issue with sessions: when a `session/new`
99/// response is being processed, the dynamic handler for that session hasn't been registered
100/// yet, but `session/update` notifications for that session may already be arriving.
101/// By setting `retry: true`, these early notifications are queued until the session's
102/// dynamic handler is added.
103///
104/// # Handler Registration
105///
106/// Most users register handlers using the builder methods on [`JrConnectionBuilder`]:
107///
108/// ```
109/// # use sacp::{AgentToClient, ClientToAgent, Component};
110/// # use sacp::schema::{InitializeRequest, InitializeResponse, AgentCapabilities};
111/// # use sacp_test::StatusUpdate;
112/// # async fn example(transport: impl Component<ClientToAgent>) -> Result<(), sacp::Error> {
113/// AgentToClient::builder()
114/// .on_receive_request(async |req: InitializeRequest, request_cx, cx| {
115/// request_cx.respond(
116/// InitializeResponse::new(req.protocol_version)
117/// .agent_capabilities(AgentCapabilities::new()),
118/// )
119/// }, sacp::on_receive_request!())
120/// .on_receive_notification(async |notif: StatusUpdate, cx| {
121/// // Process notification
122/// Ok(())
123/// }, sacp::on_receive_notification!())
124/// .serve(transport)
125/// .await?;
126/// # Ok(())
127/// # }
128/// ```
129///
130/// The type parameter on the closure determines which messages are dispatched to it.
131/// Messages that don't match the type are automatically passed to the next handler.
132///
133/// # Implementing Custom Handlers
134///
135/// For advanced use cases, you can implement `JrMessageHandler` directly:
136///
137/// ```ignore
138/// struct MyHandler;
139///
140/// impl JrMessageHandler for MyHandler {
141/// type Link = ClientToAgent;
142///
143/// async fn handle_message(
144/// &mut self,
145/// message: MessageCx,
146/// cx: JrConnectionCx<Self::Role>,
147/// ) -> Result<Handled<MessageCx>, Error> {
148/// if message.method() == "my/custom/method" {
149/// // Handle it
150/// Ok(Handled::Yes)
151/// } else {
152/// // Pass to next handler
153/// Ok(Handled::No { message, retry: false })
154/// }
155/// }
156///
157/// fn describe_chain(&self) -> impl std::fmt::Debug {
158/// "MyHandler"
159/// }
160/// }
161/// ```
162///
163/// # Important: Handlers Must Not Block
164///
165/// The connection processes messages on a single async task. While a handler is running,
166/// no other messages can be processed. For expensive operations, use [`JrConnectionCx::spawn`]
167/// to run work concurrently:
168///
169/// ```
170/// # use sacp::{ClientToAgent, AgentToClient, Component};
171/// # use sacp_test::{expensive_operation, ProcessComplete};
172/// # async fn example(transport: impl Component<AgentToClient>) -> Result<(), sacp::Error> {
173/// # ClientToAgent::builder().run_until(transport, async |cx| {
174/// cx.spawn({
175/// let connection_cx = cx.clone();
176/// async move {
177/// let result = expensive_operation("data").await?;
178/// connection_cx.send_notification(ProcessComplete { result })?;
179/// Ok(())
180/// }
181/// })?;
182/// # Ok(())
183/// # }).await?;
184/// # Ok(())
185/// # }
186/// ```
187#[allow(async_fn_in_trait)]
188/// A handler for incoming JSON-RPC messages.
189///
190/// This trait is implemented by types that can process incoming messages on a connection.
191/// Handlers are registered with a [`JrConnectionBuilder`] and are called in order until
192/// one claims the message.
193pub trait JrMessageHandler: Send {
194 /// The role type for this handler's connection.
195 type Link: JrLink;
196
197 /// Attempt to claim an incoming message (request or notification).
198 ///
199 /// # Important: do not block
200 ///
201 /// The server will not process new messages until this handler returns.
202 /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
203 /// The recommended approach to manage expensive operations is to the [`JrConnectionCx::spawn`] method available on the message context.
204 ///
205 /// # Parameters
206 ///
207 /// * `message` - The incoming message to handle.
208 /// * `cx` - The connection context, used to send messages and access connection state.
209 ///
210 /// # Returns
211 ///
212 /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
213 /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
214 /// * `Err` if an internal error occurs (this will bring down the server).
215 fn handle_message(
216 &mut self,
217 message: MessageCx,
218 cx: JrConnectionCx<Self::Link>,
219 ) -> impl Future<Output = Result<Handled<MessageCx>, crate::Error>> + Send;
220
221 /// Returns a debug description of the registered handlers for diagnostics.
222 fn describe_chain(&self) -> impl std::fmt::Debug;
223}
224
225impl<H: JrMessageHandler> JrMessageHandler for &mut H {
226 type Link = H::Link;
227
228 fn handle_message(
229 &mut self,
230 message: MessageCx,
231 cx: JrConnectionCx<Self::Link>,
232 ) -> impl Future<Output = Result<Handled<MessageCx>, crate::Error>> + Send {
233 H::handle_message(self, message, cx)
234 }
235
236 fn describe_chain(&self) -> impl std::fmt::Debug {
237 H::describe_chain(self)
238 }
239}
240
241/// A JSON-RPC connection that can act as either a server, client, or both.
242///
243/// `JrConnection` provides a builder-style API for creating JSON-RPC servers and clients.
244/// You start by calling `Link::builder()` (e.g., `ClientToAgent::builder()`), then add message
245/// handlers, and finally drive the connection with either [`serve`](JrConnectionBuilder::serve)
246/// or [`run_until`](JrConnectionBuilder::run_until), providing a component implementation
247/// (e.g., [`ByteStreams`] for byte streams).
248///
249/// # JSON-RPC Primer
250///
251/// JSON-RPC 2.0 has two fundamental message types:
252///
253/// * **Requests** - Messages that expect a response. They have an `id` field that gets
254/// echoed back in the response so the sender can correlate them.
255/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
256/// expect or receive a response.
257///
258/// # Type-Driven Message Dispatch
259///
260/// The handler registration methods use Rust's type system to determine which messages
261/// to handle. The type parameter you provide controls what gets dispatched to your handler:
262///
263/// ## Single Message Types
264///
265/// The simplest case - handle one specific message type:
266///
267/// ```no_run
268/// # use sacp_test::*;
269/// # use sacp::schema::{InitializeRequest, InitializeResponse, SessionNotification};
270/// # async fn example() -> Result<(), sacp::Error> {
271/// # let connection = mock_connection();
272/// connection
273/// .on_receive_request(async |req: InitializeRequest, request_cx, cx| {
274/// // Handle only InitializeRequest messages
275/// request_cx.respond(InitializeResponse::make())
276/// }, sacp::on_receive_request!())
277/// .on_receive_notification(async |notif: SessionNotification, cx| {
278/// // Handle only SessionUpdate notifications
279/// Ok(())
280/// }, sacp::on_receive_notification!())
281/// # .serve(sacp_test::MockTransport).await?;
282/// # Ok(())
283/// # }
284/// ```
285///
286/// ## Enum Message Types
287///
288/// You can also handle multiple related messages with a single handler by defining an enum
289/// that implements the appropriate trait ([`JrRequest`] or [`JrNotification`]):
290///
291/// ```no_run
292/// # use sacp_test::*;
293/// # use sacp::{JrRequest, JrMessage, UntypedMessage};
294/// # use sacp::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
295/// # async fn example() -> Result<(), sacp::Error> {
296/// # let connection = mock_connection();
297/// // Define an enum for multiple request types
298/// #[derive(Debug, Clone)]
299/// enum MyRequests {
300/// Initialize(InitializeRequest),
301/// Prompt(PromptRequest),
302/// }
303///
304/// // Implement JrRequest for your enum
305/// # impl JrMessage for MyRequests {
306/// # fn method(&self) -> &str { "myRequests" }
307/// # fn to_untyped_message(&self) -> Result<UntypedMessage, sacp::Error> { todo!() }
308/// # fn parse_message(_method: &str, _params: &impl serde::Serialize) -> Option<Result<Self, sacp::Error>> { None }
309/// # }
310/// impl JrRequest for MyRequests { type Response = serde_json::Value; }
311///
312/// // Handle all variants in one place
313/// connection.on_receive_request(async |req: MyRequests, request_cx, cx| {
314/// match req {
315/// MyRequests::Initialize(init) => { request_cx.respond(serde_json::json!({})) }
316/// MyRequests::Prompt(prompt) => { request_cx.respond(serde_json::json!({})) }
317/// }
318/// }, sacp::on_receive_request!())
319/// # .serve(sacp_test::MockTransport).await?;
320/// # Ok(())
321/// # }
322/// ```
323///
324/// ## Mixed Message Types
325///
326/// For enums containing both requests AND notifications, use [`on_receive_message`](Self::on_receive_message):
327///
328/// ```no_run
329/// # use sacp_test::*;
330/// # use sacp::MessageCx;
331/// # use sacp::schema::{InitializeRequest, InitializeResponse, SessionNotification};
332/// # async fn example() -> Result<(), sacp::Error> {
333/// # let connection = mock_connection();
334/// // on_receive_message receives MessageCx which can be either a request or notification
335/// connection.on_receive_message(async |msg: MessageCx<InitializeRequest, SessionNotification>, _cx| {
336/// match msg {
337/// MessageCx::Request(req, request_cx) => {
338/// request_cx.respond(InitializeResponse::make())
339/// }
340/// MessageCx::Notification(notif) => {
341/// Ok(())
342/// }
343/// }
344/// }, sacp::on_receive_message!())
345/// # .serve(sacp_test::MockTransport).await?;
346/// # Ok(())
347/// # }
348/// ```
349///
350/// # Handler Registration
351///
352/// Register handlers using these methods (listed from most common to most flexible):
353///
354/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
355/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
356/// * [`on_receive_message`](Self::on_receive_message) - Handle enums containing both requests and notifications
357/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
358///
359/// ## Handler Ordering
360///
361/// Handlers are tried in the order you register them. The first handler that claims a message
362/// (by matching its type) will process it. Subsequent handlers won't see that message:
363///
364/// ```no_run
365/// # use sacp_test::*;
366/// # use sacp::{MessageCx, UntypedMessage};
367/// # use sacp::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
368/// # async fn example() -> Result<(), sacp::Error> {
369/// # let connection = mock_connection();
370/// connection
371/// .on_receive_request(async |req: InitializeRequest, request_cx, cx| {
372/// // This runs first for InitializeRequest
373/// request_cx.respond(InitializeResponse::make())
374/// }, sacp::on_receive_request!())
375/// .on_receive_request(async |req: PromptRequest, request_cx, cx| {
376/// // This runs first for PromptRequest
377/// request_cx.respond(PromptResponse::make())
378/// }, sacp::on_receive_request!())
379/// .on_receive_message(async |msg: MessageCx, cx| {
380/// // This runs for any message not handled above
381/// msg.respond_with_error(sacp::util::internal_error("unknown method"), cx)
382/// }, sacp::on_receive_message!())
383/// # .serve(sacp_test::MockTransport).await?;
384/// # Ok(())
385/// # }
386/// ```
387///
388/// # Event Loop and Concurrency
389///
390/// Understanding the event loop is critical for writing correct handlers.
391///
392/// ## The Event Loop
393///
394/// `JrConnection` runs all handler callbacks on a single async task - the event loop.
395/// While a handler is running, **the server cannot receive new messages**. This means
396/// any blocking or expensive work in your handlers will stall the entire connection.
397///
398/// To avoid blocking the event loop, use [`JrConnectionCx::spawn`] to offload serious
399/// work to concurrent tasks:
400///
401/// ```no_run
402/// # use sacp_test::*;
403/// # async fn example() -> Result<(), sacp::Error> {
404/// # let connection = mock_connection();
405/// connection.on_receive_request(async |req: AnalyzeRequest, request_cx, cx| {
406/// // Clone cx for the spawned task
407/// cx.spawn({
408/// let connection_cx = cx.clone();
409/// async move {
410/// let result = expensive_analysis(&req.data).await?;
411/// connection_cx.send_notification(AnalysisComplete { result })?;
412/// Ok(())
413/// }
414/// })?;
415///
416/// // Respond immediately without blocking
417/// request_cx.respond(AnalysisStarted { job_id: 42 })
418/// }, sacp::on_receive_request!())
419/// # .serve(sacp_test::MockTransport).await?;
420/// # Ok(())
421/// # }
422/// ```
423///
424/// Note that the entire connection runs within one async task, so parallelism must be
425/// managed explicitly using [`spawn`](JrConnectionCx::spawn).
426///
427/// ## The Connection Context
428///
429/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
430///
431/// * **For request handlers** - [`JrRequestCx<R>`] provides [`respond`](JrRequestCx::respond)
432/// to send the response, plus methods to send other messages
433/// * **For notification handlers** - [`JrConnectionCx`] provides methods to send messages
434/// and spawn tasks
435///
436/// Both context types support:
437/// * [`send_request`](JrConnectionCx::send_request) - Send requests to the other side
438/// * [`send_notification`](JrConnectionCx::send_notification) - Send notifications
439/// * [`spawn`](JrConnectionCx::spawn) - Run tasks concurrently without blocking the event loop
440///
441/// The [`JrResponse`] returned by `send_request` provides methods like
442/// [`on_receiving_result`](JrResponse::on_receiving_result) that help you
443/// avoid accidentally blocking the event loop while waiting for responses.
444///
445/// # Driving the Connection
446///
447/// After adding handlers, you must drive the connection using one of two modes:
448///
449/// ## Server Mode: `serve()`
450///
451/// Use [`serve`](Self::serve) when you only need to respond to incoming messages:
452///
453/// ```no_run
454/// # use sacp_test::*;
455/// # async fn example() -> Result<(), sacp::Error> {
456/// # let connection = mock_connection();
457/// connection
458/// .on_receive_request(async |req: MyRequest, request_cx, cx| {
459/// request_cx.respond(MyResponse { status: "ok".into() })
460/// }, sacp::on_receive_request!())
461/// .serve(MockTransport) // Runs until connection closes or error occurs
462/// .await?;
463/// # Ok(())
464/// # }
465/// ```
466///
467/// The connection will process incoming messages and invoke your handlers until the
468/// connection is closed or an error occurs.
469///
470/// ## Client Mode: `run_until()`
471///
472/// Use [`run_until`](Self::run_until) when you need to both handle incoming messages
473/// AND send your own requests/notifications:
474///
475/// ```no_run
476/// # use sacp_test::*;
477/// # use sacp::schema::InitializeRequest;
478/// # async fn example() -> Result<(), sacp::Error> {
479/// # let connection = mock_connection();
480/// connection
481/// .on_receive_request(async |req: MyRequest, request_cx, cx| {
482/// request_cx.respond(MyResponse { status: "ok".into() })
483/// }, sacp::on_receive_request!())
484/// .run_until(MockTransport, async |cx| {
485/// // You can send requests to the other side
486/// let response = cx.send_request(InitializeRequest::make())
487/// .block_task()
488/// .await?;
489///
490/// // And send notifications
491/// cx.send_notification(StatusUpdate { message: "ready".into() })?;
492///
493/// Ok(())
494/// })
495/// .await?;
496/// # Ok(())
497/// # }
498/// ```
499///
500/// The connection will serve incoming messages in the background while your client closure
501/// runs. When the closure returns, the connection shuts down.
502///
503/// # Example: Complete Agent
504///
505/// ```no_run
506/// # use sacp::link::UntypedLink;
507/// # use sacp::{JrConnectionBuilder};
508/// # use sacp::ByteStreams;
509/// # use sacp::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
510/// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
511/// # async fn example() -> Result<(), sacp::Error> {
512/// let transport = ByteStreams::new(
513/// tokio::io::stdout().compat_write(),
514/// tokio::io::stdin().compat(),
515/// );
516///
517/// UntypedLink::builder()
518/// .name("my-agent") // Optional: for debugging logs
519/// .on_receive_request(async |init: InitializeRequest, request_cx, cx| {
520/// let response: InitializeResponse = todo!();
521/// request_cx.respond(response)
522/// }, sacp::on_receive_request!())
523/// .on_receive_request(async |prompt: PromptRequest, request_cx, cx| {
524/// // You can send notifications while processing a request
525/// let notif: SessionNotification = todo!();
526/// cx.send_notification(notif)?;
527///
528/// // Then respond to the request
529/// let response: PromptResponse = todo!();
530/// request_cx.respond(response)
531/// }, sacp::on_receive_request!())
532/// .serve(transport)
533/// .await?;
534/// # Ok(())
535/// # }
536/// ```
537#[must_use]
538pub struct JrConnectionBuilder<H: JrMessageHandler, R: JrResponder<H::Link> = NullResponder> {
539 name: Option<String>,
540
541 /// Handler for incoming messages.
542 handler: H,
543
544 /// Responder for background tasks.
545 responder: R,
546}
547
548impl<Link: JrLink> JrConnectionBuilder<NullHandler<Link>, NullResponder> {
549 /// Create a new JrConnection with the given role.
550 /// This type follows a builder pattern; use other methods to configure and then invoke
551 /// [`Self::serve`] (to use as a server) or [`Self::run_until`] to use as a client.
552 pub(crate) fn new(role: Link) -> Self {
553 Self {
554 name: Default::default(),
555 handler: NullHandler::new(role),
556 responder: NullResponder,
557 }
558 }
559}
560
561impl<H: JrMessageHandler> JrConnectionBuilder<H, NullResponder> {
562 /// Create a new connection builder with the given handler.
563 pub fn new_with(handler: H) -> Self {
564 Self {
565 name: Default::default(),
566 handler,
567 responder: NullResponder,
568 }
569 }
570}
571
572impl<H: JrMessageHandler, R: JrResponder<H::Link>> JrConnectionBuilder<H, R> {
573 /// Set the "name" of this connection -- used only for debugging logs.
574 pub fn name(mut self, name: impl ToString) -> Self {
575 self.name = Some(name.to_string());
576 self
577 }
578
579 /// Merge another [`JrConnectionBuilder`] into this one.
580 ///
581 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
582 /// This is a low-level method that is not intended for general use.
583 pub fn with_connection_builder<H1, R1>(
584 self,
585 other: JrConnectionBuilder<H1, R1>,
586 ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, impl JrResponder<H::Link>>
587 where
588 H1: JrMessageHandler<Link = H::Link>,
589 R1: JrResponder<H::Link>,
590 {
591 JrConnectionBuilder {
592 name: self.name,
593 handler: ChainedHandler::new(
594 self.handler,
595 NamedHandler::new(other.name, other.handler),
596 ),
597 responder: ChainResponder::new(self.responder, other.responder),
598 }
599 }
600
601 /// Add a new [`JrMessageHandler`] to the chain.
602 ///
603 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
604 /// This is a low-level method that is not intended for general use.
605 pub fn with_handler<H1>(
606 self,
607 handler: H1,
608 ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
609 where
610 H1: JrMessageHandler<Link = H::Link>,
611 {
612 JrConnectionBuilder {
613 name: self.name,
614 handler: ChainedHandler::new(self.handler, handler),
615 responder: self.responder,
616 }
617 }
618
619 /// Add a new [`JrResponder`] to the chain.
620 pub fn with_responder<R1>(
621 self,
622 responder: R1,
623 ) -> JrConnectionBuilder<H, impl JrResponder<H::Link>>
624 where
625 R1: JrResponder<H::Link>,
626 {
627 JrConnectionBuilder {
628 name: self.name,
629 handler: self.handler,
630 responder: ChainResponder::new(self.responder, responder),
631 }
632 }
633
634 /// Enqueue a task to run once the connection is actively serving traffic.
635 #[track_caller]
636 pub fn with_spawned<F, Fut>(self, task: F) -> JrConnectionBuilder<H, impl JrResponder<H::Link>>
637 where
638 F: FnOnce(JrConnectionCx<H::Link>) -> Fut + Send,
639 Fut: Future<Output = Result<(), crate::Error>> + Send,
640 {
641 let location = Location::caller();
642 self.with_responder(SpawnedResponder::new(location, task))
643 }
644
645 /// Register a handler for messages that can be either requests OR notifications.
646 ///
647 /// Use this when you want to handle an enum type that contains both request and
648 /// notification variants. Your handler receives a [`MessageCx<Req, Notif>`] which
649 /// is an enum with two variants:
650 ///
651 /// - `MessageCx::Request(request, request_cx)` - A request with its response context
652 /// - `MessageCx::Notification(notification)` - A notification
653 ///
654 /// # Example
655 ///
656 /// ```no_run
657 /// # use sacp_test::*;
658 /// # use sacp::MessageCx;
659 /// # async fn example() -> Result<(), sacp::Error> {
660 /// # let connection = mock_connection();
661 /// connection.on_receive_message(async |message: MessageCx<MyRequest, StatusUpdate>, _cx| {
662 /// match message {
663 /// MessageCx::Request(req, request_cx) => {
664 /// // Handle request and send response
665 /// request_cx.respond(MyResponse { status: "ok".into() })
666 /// }
667 /// MessageCx::Notification(notif) => {
668 /// // Handle notification (no response needed)
669 /// Ok(())
670 /// }
671 /// }
672 /// }, sacp::on_receive_message!())
673 /// # .serve(sacp_test::MockTransport).await?;
674 /// # Ok(())
675 /// # }
676 /// ```
677 ///
678 /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
679 /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
680 /// for handling requests or notifications separately.
681 ///
682 /// # Ordering
683 ///
684 /// This callback runs inside the dispatch loop and blocks further message processing
685 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
686 /// ordering guarantees and how to avoid deadlocks.
687 pub fn on_receive_message<Req, Notif, F, T, ToFut>(
688 self,
689 op: F,
690 to_future_hack: ToFut,
691 ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
692 where
693 H::Link: HasDefaultPeer,
694 Req: JrRequest,
695 Notif: JrNotification,
696 F: AsyncFnMut(MessageCx<Req, Notif>, JrConnectionCx<H::Link>) -> Result<T, crate::Error>
697 + Send,
698 T: IntoHandled<MessageCx<Req, Notif>>,
699 ToFut: Fn(
700 &mut F,
701 MessageCx<Req, Notif>,
702 JrConnectionCx<H::Link>,
703 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
704 + Send
705 + Sync,
706 {
707 self.with_handler(MessageHandler::new(
708 <H::Link as HasDefaultPeer>::DefaultPeer::default(),
709 <H::Link>::default(),
710 op,
711 to_future_hack,
712 ))
713 }
714
715 /// Register a handler for JSON-RPC requests of type `Req`.
716 ///
717 /// Your handler receives two arguments:
718 /// 1. The request (type `Req`)
719 /// 2. A [`JrRequestCx<R, Req::Response>`] for sending the response
720 ///
721 /// The request context allows you to:
722 /// - Send the response with [`JrRequestCx::respond`]
723 /// - Send notifications to the client with [`JrConnectionCx::send_notification`]
724 /// - Send requests to the client with [`JrConnectionCx::send_request`]
725 ///
726 /// # Example
727 ///
728 /// ```ignore
729 /// # use sacp::link::UntypedLink;
730 /// # use sacp::{JrConnectionBuilder};
731 /// # use sacp::schema::{PromptRequest, PromptResponse, SessionNotification};
732 /// # fn example(connection: JrConnectionBuilder<impl sacp::JrMessageHandler<Link = UntypedLink>>) {
733 /// connection.on_receive_request(async |request: PromptRequest, request_cx, cx| {
734 /// // Send a notification while processing
735 /// let notif: SessionNotification = todo!();
736 /// cx.send_notification(notif)?;
737 ///
738 /// // Do some work...
739 /// let result = todo!("process the prompt");
740 ///
741 /// // Send the response
742 /// let response: PromptResponse = todo!();
743 /// request_cx.respond(response)
744 /// }, sacp::on_receive_request!());
745 /// # }
746 /// ```
747 ///
748 /// # Type Parameter
749 ///
750 /// `Req` can be either a single request type or an enum of multiple request types.
751 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
752 ///
753 /// # Ordering
754 ///
755 /// This callback runs inside the dispatch loop and blocks further message processing
756 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
757 /// ordering guarantees and how to avoid deadlocks.
758 pub fn on_receive_request<Req: JrRequest, F, T, ToFut>(
759 self,
760 op: F,
761 to_future_hack: ToFut,
762 ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
763 where
764 H::Link: HasDefaultPeer,
765 F: AsyncFnMut(
766 Req,
767 JrRequestCx<Req::Response>,
768 JrConnectionCx<H::Link>,
769 ) -> Result<T, crate::Error>
770 + Send,
771 T: IntoHandled<(Req, JrRequestCx<Req::Response>)>,
772 ToFut: Fn(
773 &mut F,
774 Req,
775 JrRequestCx<Req::Response>,
776 JrConnectionCx<H::Link>,
777 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
778 + Send
779 + Sync,
780 {
781 self.with_handler(RequestHandler::new(
782 <H::Link as HasDefaultPeer>::DefaultPeer::default(),
783 <H::Link>::default(),
784 op,
785 to_future_hack,
786 ))
787 }
788
789 /// Register a handler for JSON-RPC notifications of type `Notif`.
790 ///
791 /// Notifications are fire-and-forget messages that don't expect a response.
792 /// Your handler receives:
793 /// 1. The notification (type `Notif`)
794 /// 2. A [`JrConnectionCx<R>`] for sending messages to the other side
795 ///
796 /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
797 /// but you can still send your own requests and notifications using the context.
798 ///
799 /// # Example
800 ///
801 /// ```no_run
802 /// # use sacp_test::*;
803 /// # async fn example() -> Result<(), sacp::Error> {
804 /// # let connection = mock_connection();
805 /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
806 /// // Process the notification
807 /// update_session_state(¬if)?;
808 ///
809 /// // Optionally send a notification back
810 /// cx.send_notification(StatusUpdate {
811 /// message: "Acknowledged".into(),
812 /// })?;
813 ///
814 /// Ok(())
815 /// }, sacp::on_receive_notification!())
816 /// # .serve(sacp_test::MockTransport).await?;
817 /// # Ok(())
818 /// # }
819 /// ```
820 ///
821 /// # Type Parameter
822 ///
823 /// `Notif` can be either a single notification type or an enum of multiple notification types.
824 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
825 ///
826 /// # Ordering
827 ///
828 /// This callback runs inside the dispatch loop and blocks further message processing
829 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
830 /// ordering guarantees and how to avoid deadlocks.
831 pub fn on_receive_notification<Notif, F, T, ToFut>(
832 self,
833 op: F,
834 to_future_hack: ToFut,
835 ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
836 where
837 H::Link: HasDefaultPeer,
838 Notif: JrNotification,
839 F: AsyncFnMut(Notif, JrConnectionCx<H::Link>) -> Result<T, crate::Error> + Send,
840 T: IntoHandled<(Notif, JrConnectionCx<H::Link>)>,
841 ToFut: Fn(
842 &mut F,
843 Notif,
844 JrConnectionCx<H::Link>,
845 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
846 + Send
847 + Sync,
848 {
849 self.with_handler(NotificationHandler::new(
850 <H::Link as HasDefaultPeer>::DefaultPeer::default(),
851 <H::Link>::default(),
852 op,
853 to_future_hack,
854 ))
855 }
856
857 /// Register a handler for messages from a specific peer.
858 ///
859 /// This is similar to [`on_receive_message`](Self::on_receive_message), but allows
860 /// specifying the source peer explicitly. This is useful when receiving messages
861 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorMessage`
862 /// envelopes when receiving from an agent via a proxy).
863 ///
864 /// For the common case of receiving from the default counterpart, use
865 /// [`on_receive_message`](Self::on_receive_message) instead.
866 ///
867 /// # Ordering
868 ///
869 /// This callback runs inside the dispatch loop and blocks further message processing
870 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
871 /// ordering guarantees and how to avoid deadlocks.
872 pub fn on_receive_message_from<
873 Req: JrRequest,
874 Notif: JrNotification,
875 Peer: JrPeer,
876 F,
877 T,
878 ToFut,
879 >(
880 self,
881 peer: Peer,
882 op: F,
883 to_future_hack: ToFut,
884 ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
885 where
886 H::Link: HasPeer<Peer>,
887 F: AsyncFnMut(MessageCx<Req, Notif>, JrConnectionCx<H::Link>) -> Result<T, crate::Error>
888 + Send,
889 T: IntoHandled<MessageCx<Req, Notif>>,
890 ToFut: Fn(
891 &mut F,
892 MessageCx<Req, Notif>,
893 JrConnectionCx<H::Link>,
894 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
895 + Send
896 + Sync,
897 {
898 self.with_handler(MessageHandler::new(
899 peer,
900 <H::Link>::default(),
901 op,
902 to_future_hack,
903 ))
904 }
905
906 /// Register a handler for JSON-RPC requests from a specific peer.
907 ///
908 /// This is similar to [`on_receive_request`](Self::on_receive_request), but allows
909 /// specifying the source peer explicitly. This is useful when receiving messages
910 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorRequest`
911 /// envelopes when receiving from an agent via a proxy).
912 ///
913 /// For the common case of receiving from the default counterpart, use
914 /// [`on_receive_request`](Self::on_receive_request) instead.
915 ///
916 /// # Example
917 ///
918 /// ```ignore
919 /// use sacp::Agent;
920 /// use sacp::schema::InitializeRequest;
921 ///
922 /// // Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
923 /// connection.on_receive_request_from(Agent, async |req: InitializeRequest, request_cx, cx| {
924 /// // Handle the request
925 /// request_cx.respond(InitializeResponse::make())
926 /// })
927 /// ```
928 ///
929 /// # Ordering
930 ///
931 /// This callback runs inside the dispatch loop and blocks further message processing
932 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
933 /// ordering guarantees and how to avoid deadlocks.
934 pub fn on_receive_request_from<Req: JrRequest, Peer: JrPeer, F, T, ToFut>(
935 self,
936 peer: Peer,
937 op: F,
938 to_future_hack: ToFut,
939 ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
940 where
941 H::Link: HasPeer<Peer>,
942 F: AsyncFnMut(
943 Req,
944 JrRequestCx<Req::Response>,
945 JrConnectionCx<H::Link>,
946 ) -> Result<T, crate::Error>
947 + Send,
948 T: IntoHandled<(Req, JrRequestCx<Req::Response>)>,
949 ToFut: Fn(
950 &mut F,
951 Req,
952 JrRequestCx<Req::Response>,
953 JrConnectionCx<H::Link>,
954 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
955 + Send
956 + Sync,
957 {
958 self.with_handler(RequestHandler::new(
959 peer,
960 <H::Link>::default(),
961 op,
962 to_future_hack,
963 ))
964 }
965
966 /// Register a handler for JSON-RPC notifications from a specific peer.
967 ///
968 /// This is similar to [`on_receive_notification`](Self::on_receive_notification), but allows
969 /// specifying the source peer explicitly. This is useful when receiving messages
970 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorNotification`
971 /// envelopes when receiving from an agent via a proxy).
972 ///
973 /// For the common case of receiving from the default counterpart, use
974 /// [`on_receive_notification`](Self::on_receive_notification) instead.
975 ///
976 /// # Ordering
977 ///
978 /// This callback runs inside the dispatch loop and blocks further message processing
979 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
980 /// ordering guarantees and how to avoid deadlocks.
981 pub fn on_receive_notification_from<Notif: JrNotification, Peer: JrPeer, F, T, ToFut>(
982 self,
983 peer: Peer,
984 op: F,
985 to_future_hack: ToFut,
986 ) -> JrConnectionBuilder<impl JrMessageHandler<Link = H::Link>, R>
987 where
988 H::Link: HasPeer<Peer>,
989 F: AsyncFnMut(Notif, JrConnectionCx<H::Link>) -> Result<T, crate::Error> + Send,
990 T: IntoHandled<(Notif, JrConnectionCx<H::Link>)>,
991 ToFut: Fn(
992 &mut F,
993 Notif,
994 JrConnectionCx<H::Link>,
995 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
996 + Send
997 + Sync,
998 {
999 self.with_handler(NotificationHandler::new(
1000 peer,
1001 <H::Link>::default(),
1002 op,
1003 to_future_hack,
1004 ))
1005 }
1006
1007 /// In a proxy, add this MCP server to new sessions passing through the proxy.
1008 ///
1009 /// This adds a handler that intercepts `session/new` requests to include the
1010 /// registered MCP server.
1011 ///
1012 /// # Example
1013 ///
1014 /// ```ignore
1015 /// use sacp::mcp_server::McpServiceRegistry;
1016 /// use sacp::ProxyToConductor;
1017 ///
1018 /// ProxyToConductor::builder()
1019 /// .name("my-proxy")
1020 /// .provide_mcp(McpServiceRegistry::default().with_mcp_server("example", my_server)?)
1021 /// .serve(connection)
1022 /// .await?;
1023 /// ```
1024 pub fn with_mcp_server<Link: JrLink, McpR: JrResponder<Link>>(
1025 self,
1026 server: McpServer<Link, McpR>,
1027 ) -> JrConnectionBuilder<impl JrMessageHandler<Link = Link>, impl JrResponder<Link>>
1028 where
1029 H: JrMessageHandler<Link = Link>,
1030 Link: HasPeer<ClientPeer> + HasPeer<AgentPeer>,
1031 {
1032 let (message_handler, mcp_responder) = server.into_handler_and_responder();
1033 self.with_handler(message_handler)
1034 .with_responder(mcp_responder)
1035 }
1036
1037 /// Connect these handlers to a transport layer.
1038 /// The resulting connection must then be either [served](`JrConnection::serve`) or [run until completion](`JrConnection::run_until`).
1039 pub fn connect_to(
1040 self,
1041 transport: impl Component<<H::Link as JrLink>::ConnectsTo> + 'static,
1042 ) -> Result<JrConnection<H, R>, crate::Error> {
1043 let Self {
1044 name,
1045 handler,
1046 responder,
1047 } = self;
1048
1049 let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
1050 let (new_task_tx, new_task_rx) = mpsc::unbounded();
1051 let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
1052 let cx = JrConnectionCx::new(outgoing_tx, new_task_tx, dynamic_handler_tx);
1053
1054 // Convert transport into server - this returns a channel for us to use
1055 // and a future that runs the transport
1056 let transport_component = crate::DynComponent::new(transport);
1057 let (transport_channel, transport_future) = transport_component.into_server();
1058 cx.spawn(transport_future)?;
1059
1060 // Destructure the channel endpoints
1061 let Channel {
1062 rx: transport_incoming_rx,
1063 tx: transport_outgoing_tx,
1064 } = transport_channel;
1065
1066 Ok(JrConnection {
1067 cx,
1068 name,
1069 outgoing_rx,
1070 new_task_rx,
1071 transport_outgoing_tx,
1072 transport_incoming_rx,
1073 dynamic_handler_rx,
1074 handler,
1075 responder,
1076 })
1077 }
1078
1079 /// Apply the registered handlers to a single message.
1080 ///
1081 /// This method processes one message through all registered handlers, attempting to
1082 /// match it against each handler in order. This is useful when implementing
1083 /// custom message handling logic or when you need fine-grained control over message
1084 /// processing.
1085 ///
1086 /// # Returns
1087 ///
1088 /// - `Ok(Handled::Claimed)` - A handler claimed and processed the message
1089 /// - `Ok(Handled::Unclaimed(message))` - No handler matched the message
1090 /// - `Err(_)` - A handler encountered an error while processing
1091 ///
1092 /// # Borrow Checker Considerations
1093 ///
1094 /// You may find that [`MatchMessage`](`crate::util::MatchMessage`) is a better choice than this method
1095 /// for implementing custom handlers. It offers a very similar API to
1096 /// [`JrConnectionBuilder`] but is structured to apply each test one at a time
1097 /// (sequentially) instead of setting them all up at once. This sequential approach
1098 /// often interacts better with the borrow checker, at the cost of requiring `.await`
1099 /// calls between each handler and only working for processing a single message.
1100 ///
1101 /// # Example: Borrow Checker Challenges
1102 ///
1103 /// When building a connection with `async {}` blocks (non-move), you might encounter
1104 /// borrow checker errors if multiple handlers need access to the same mutable state:
1105 ///
1106 /// ```compile_fail
1107 /// # use sacp::{JrConnectionBuilder, JrRequestCx};
1108 /// # use sacp::schema::{InitializeRequest, InitializeResponse};
1109 /// # use sacp::schema::{PromptRequest, PromptResponse};
1110 /// # async fn example() -> Result<(), sacp::Error> {
1111 /// let mut state = String::from("initial");
1112 ///
1113 /// // This fails to compile because both handlers borrow `state` mutably,
1114 /// // and the futures are set up at the same time (even though only one will run)
1115 /// let chain = UntypedLink::builder()
1116 /// .on_receive_request(async |req: InitializeRequest, cx: JrRequestCx| {
1117 /// state.push_str(" - initialized"); // First mutable borrow
1118 /// cx.respond(InitializeResponse::make())
1119 /// }, sacp::on_receive_request!())
1120 /// .on_receive_request(async |req: PromptRequest, cx: JrRequestCx| {
1121 /// state.push_str(" - prompted"); // Second mutable borrow - ERROR!
1122 /// cx.respond(PromptResponse { content: vec![], stopReason: None })
1123 /// }, sacp::on_receive_request!());
1124 /// # Ok(())
1125 /// # }
1126 /// ```
1127 ///
1128 /// You can work around this by using `apply()` to process messages one at a time,
1129 /// or use [`MatchMessage`](`crate::util::MatchMessage`) which provides a similar API but applies handlers sequentially:
1130 ///
1131 /// ```ignore
1132 /// use sacp::{MessageCx, Handled};
1133 /// use sacp::util::MatchMessage;
1134 ///
1135 /// async fn handle_with_state(
1136 /// message: MessageCx,
1137 /// state: &mut String,
1138 /// ) -> Result<Handled<MessageCx>, sacp::Error> {
1139 /// MatchMessage::new(message)
1140 /// .on_request(async |req: InitializeRequest, request_cx| {
1141 /// state.push_str(" - initialized"); // Sequential - OK!
1142 /// request_cx.respond(InitializeResponse::make())
1143 /// })
1144 /// .on_request(async |req: PromptRequest, request_cx| {
1145 /// state.push_str(" - prompted"); // Sequential - OK!
1146 /// request_cx.respond(PromptResponse { content: vec![], stopReason: None })
1147 /// })
1148 /// .otherwise(async |msg| Ok(Handled::Unclaimed(msg)))
1149 /// .await
1150 /// }
1151 /// ```
1152 pub async fn apply(
1153 &mut self,
1154 message: MessageCx,
1155 cx: JrConnectionCx<H::Link>,
1156 ) -> Result<Handled<MessageCx>, crate::Error> {
1157 self.handler.handle_message(message, cx).await
1158 }
1159
1160 /// Convenience method to connect to a transport and serve.
1161 ///
1162 /// This is equivalent to:
1163 /// ```text
1164 /// handler_chain.connect_to(transport)?.serve().await
1165 /// ```
1166 pub async fn serve(
1167 self,
1168 transport: impl Component<<H::Link as JrLink>::ConnectsTo> + 'static,
1169 ) -> Result<(), crate::Error> {
1170 self.connect_to(transport)?.serve().await
1171 }
1172
1173 /// Convenience method to connect to a transport and run until a closure completes.
1174 ///
1175 /// This is equivalent to:
1176 /// ```text
1177 /// handler_chain.connect_to(transport)?.run_until(main_fn).await
1178 /// ```
1179 pub async fn run_until(
1180 self,
1181 transport: impl Component<<H::Link as JrLink>::ConnectsTo> + 'static,
1182 main_fn: impl AsyncFnOnce(JrConnectionCx<H::Link>) -> Result<(), crate::Error>,
1183 ) -> Result<(), crate::Error> {
1184 self.connect_to(transport)?.run_until(main_fn).await
1185 }
1186}
1187
1188impl<H, R> Component<H::Link> for JrConnectionBuilder<H, R>
1189where
1190 H: JrMessageHandler + 'static,
1191 R: JrResponder<H::Link> + 'static,
1192{
1193 async fn serve(
1194 self,
1195 client: impl Component<<H::Link as JrLink>::ConnectsTo>,
1196 ) -> Result<(), crate::Error> {
1197 self.connect_to(client)?.serve().await
1198 }
1199}
1200
1201/// A JSON-RPC connection with an active transport.
1202///
1203/// This type represents a `JrConnectionBuilder` that has been connected to a transport
1204/// via `connect_to()`. It can be driven in two modes:
1205///
1206/// - [`serve()`](Self::serve) - Run as a server, handling incoming messages until the connection closes
1207/// - [`run_until()`](Self::run_until) - Run until a closure completes, allowing you to send requests/notifications
1208///
1209/// Most users won't construct this directly - instead use `JrConnectionBuilder::connect_to()` or
1210/// `JrConnectionBuilder::serve()` for convenience.
1211pub struct JrConnection<H: JrMessageHandler, R: JrResponder<H::Link> = NullResponder> {
1212 cx: JrConnectionCx<H::Link>,
1213 name: Option<String>,
1214 outgoing_rx: mpsc::UnboundedReceiver<OutgoingMessage>,
1215 new_task_rx: mpsc::UnboundedReceiver<Task>,
1216 transport_outgoing_tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
1217 transport_incoming_rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
1218 dynamic_handler_rx: mpsc::UnboundedReceiver<DynamicHandlerMessage<H::Link>>,
1219 handler: H,
1220 responder: R,
1221}
1222
1223impl<H: JrMessageHandler, R: JrResponder<H::Link>> JrConnection<H, R> {
1224 /// Run the connection in server mode with the provided transport.
1225 ///
1226 /// This drives the connection by continuously processing messages from the transport
1227 /// and dispatching them to your registered handlers. The connection will run until:
1228 /// - The transport closes (e.g., EOF on byte streams)
1229 /// - An error occurs
1230 /// - One of your handlers returns an error
1231 ///
1232 /// The transport is responsible for serializing and deserializing `jsonrpcmsg::Message`
1233 /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
1234 ///
1235 /// Use this mode when you only need to respond to incoming messages and don't need
1236 /// to initiate your own requests. If you need to send requests to the other side,
1237 /// use [`run_until`](Self::run_until) instead.
1238 ///
1239 /// # Example: Byte Stream Transport
1240 ///
1241 /// ```no_run
1242 /// # use sacp::link::UntypedLink;
1243 /// # use sacp::{JrConnectionBuilder};
1244 /// # use sacp::ByteStreams;
1245 /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
1246 /// # use sacp_test::*;
1247 /// # async fn example() -> Result<(), sacp::Error> {
1248 /// let transport = ByteStreams::new(
1249 /// tokio::io::stdout().compat_write(),
1250 /// tokio::io::stdin().compat(),
1251 /// );
1252 ///
1253 /// UntypedLink::builder()
1254 /// .on_receive_request(async |req: MyRequest, request_cx, cx| {
1255 /// request_cx.respond(MyResponse { status: "ok".into() })
1256 /// }, sacp::on_receive_request!())
1257 /// .serve(transport)
1258 /// .await?;
1259 /// # Ok(())
1260 /// # }
1261 /// ```
1262 pub async fn serve(self) -> Result<(), crate::Error> {
1263 self.run_until(async move |_cx| future::pending().await)
1264 .await
1265 }
1266
1267 /// Run the connection until the provided closure completes.
1268 ///
1269 /// This drives the connection by:
1270 /// 1. Running your registered handlers in the background to process incoming messages
1271 /// 2. Executing your `main_fn` closure with a [`JrConnectionCx<R>`] for sending requests/notifications
1272 ///
1273 /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
1274 /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
1275 ///
1276 /// Use this mode when you need to initiate communication (send requests/notifications)
1277 /// in addition to responding to incoming messages. For server-only mode where you just
1278 /// respond to messages, use [`serve`](Self::serve) instead.
1279 ///
1280 /// # Example
1281 ///
1282 /// ```no_run
1283 /// # use sacp::link::UntypedLink;
1284 /// # use sacp::{JrConnectionBuilder};
1285 /// # use sacp::ByteStreams;
1286 /// # use sacp::schema::InitializeRequest;
1287 /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
1288 /// # use sacp_test::*;
1289 /// # async fn example() -> Result<(), sacp::Error> {
1290 /// let transport = ByteStreams::new(
1291 /// tokio::io::stdout().compat_write(),
1292 /// tokio::io::stdin().compat(),
1293 /// );
1294 ///
1295 /// UntypedLink::builder()
1296 /// .on_receive_request(async |req: MyRequest, request_cx, cx| {
1297 /// // Handle incoming requests in the background
1298 /// request_cx.respond(MyResponse { status: "ok".into() })
1299 /// }, sacp::on_receive_request!())
1300 /// .connect_to(transport)?
1301 /// .run_until(async |cx| {
1302 /// // Initialize the protocol
1303 /// let init_response = cx.send_request(InitializeRequest::make())
1304 /// .block_task()
1305 /// .await?;
1306 ///
1307 /// // Send more requests...
1308 /// let result = cx.send_request(MyRequest {})
1309 /// .block_task()
1310 /// .await?;
1311 ///
1312 /// // When this closure returns, the connection shuts down
1313 /// Ok(())
1314 /// })
1315 /// .await?;
1316 /// # Ok(())
1317 /// # }
1318 /// ```
1319 ///
1320 /// # Parameters
1321 ///
1322 /// - `main_fn`: Your client logic. Receives a [`JrConnectionCx<R>`] for sending messages.
1323 ///
1324 /// # Errors
1325 ///
1326 /// Returns an error if the connection closes before `main_fn` completes.
1327 pub async fn run_until<T>(
1328 self,
1329 main_fn: impl AsyncFnOnce(JrConnectionCx<H::Link>) -> Result<T, crate::Error>,
1330 ) -> Result<T, crate::Error> {
1331 let JrConnection {
1332 cx,
1333 name,
1334 outgoing_rx,
1335 new_task_rx,
1336 handler,
1337 responder,
1338 transport_outgoing_tx,
1339 transport_incoming_rx,
1340 dynamic_handler_rx,
1341 } = self;
1342 let (reply_tx, reply_rx) = mpsc::unbounded();
1343
1344 crate::util::instrument_with_connection_name(name, async move {
1345 let background = async {
1346 futures::try_join!(
1347 // Protocol layer: OutgoingMessage → jsonrpcmsg::Message
1348 outgoing_actor::outgoing_protocol_actor(
1349 outgoing_rx,
1350 reply_tx.clone(),
1351 transport_outgoing_tx,
1352 ),
1353 // Protocol layer: jsonrpcmsg::Message → handler/reply routing
1354 incoming_actor::incoming_protocol_actor(
1355 &cx,
1356 transport_incoming_rx,
1357 dynamic_handler_rx,
1358 reply_rx,
1359 handler,
1360 ),
1361 task_actor::task_actor(new_task_rx, &cx),
1362 responder.run(cx.clone()),
1363 )?;
1364 Ok(())
1365 };
1366
1367 crate::util::run_until(background, main_fn(cx.clone())).await
1368 })
1369 .await
1370 }
1371}
1372
1373/// The payload sent through the response oneshot channel.
1374///
1375/// Includes the response value and an optional ack channel for dispatch loop
1376/// synchronization.
1377pub(crate) struct ResponsePayload {
1378 /// The response result - either the JSON value or an error.
1379 pub(crate) result: Result<serde_json::Value, crate::Error>,
1380
1381 /// Optional acknowledgment channel for dispatch loop synchronization.
1382 ///
1383 /// When present, the receiver must send on this channel to signal that
1384 /// response processing is complete, allowing the dispatch loop to continue
1385 /// to the next message.
1386 ///
1387 /// This is `None` for error paths where the response is sent directly
1388 /// (e.g., when the outgoing channel is broken) rather than through the
1389 /// normal dispatch loop flow.
1390 pub(crate) ack_tx: Option<oneshot::Sender<()>>,
1391}
1392
1393impl std::fmt::Debug for ResponsePayload {
1394 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1395 f.debug_struct("ResponsePayload")
1396 .field("result", &self.result)
1397 .field("ack_tx", &self.ack_tx.as_ref().map(|_| "..."))
1398 .finish()
1399 }
1400}
1401
1402/// Message sent to the incoming actor for reply subscription management.
1403enum ReplyMessage {
1404 /// Subscribe to receive a response for the given request id.
1405 /// When a response with this id arrives, it will be sent through the oneshot
1406 /// along with an ack channel that must be signaled when processing is complete.
1407 Subscribe(jsonrpcmsg::Id, oneshot::Sender<ResponsePayload>),
1408}
1409
1410impl std::fmt::Debug for ReplyMessage {
1411 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1412 match self {
1413 ReplyMessage::Subscribe(id, _) => f.debug_tuple("Subscribe").field(id).finish(),
1414 }
1415 }
1416}
1417
1418/// Messages send to be serialized over the transport.
1419#[derive(Debug)]
1420enum OutgoingMessage {
1421 /// Send a request to the server.
1422 Request {
1423 /// method to use in the request
1424 method: String,
1425
1426 /// parameters for the request
1427 params: Option<jsonrpcmsg::Params>,
1428
1429 /// where to send the response when it arrives (includes ack channel)
1430 response_tx: oneshot::Sender<ResponsePayload>,
1431 },
1432
1433 /// Send a notification to the server.
1434 Notification {
1435 /// method to use in the request
1436 method: String,
1437
1438 /// parameters for the request
1439 params: Option<jsonrpcmsg::Params>,
1440 },
1441
1442 /// Send a reponse to a message from the server
1443 Response {
1444 id: jsonrpcmsg::Id,
1445
1446 response: Result<serde_json::Value, crate::Error>,
1447 },
1448
1449 /// Send a generalized error message
1450 Error { error: crate::Error },
1451}
1452
1453/// Return type from JrHandler; indicates whether the request was handled or not.
1454#[must_use]
1455pub enum Handled<T> {
1456 /// The message was handled
1457 Yes,
1458
1459 /// The message was not handled; returns the original value.
1460 ///
1461 /// If `retry` is true,
1462 No {
1463 /// The message to be passed to subsequent handlers
1464 /// (typically the original message, but it may have been
1465 /// mutated.)
1466 message: T,
1467
1468 /// If true, request the message to be queued and retried with
1469 /// dynamic handlers as they are added.
1470 ///
1471 /// This is used for managing session updates since the dynamic
1472 /// handler for a session cannot be added until the response to the
1473 /// new session request has been processed and there may be updates
1474 /// that get processed at the same time.
1475 retry: bool,
1476 },
1477}
1478
1479/// Trait for converting handler return values into [`Handled`].
1480///
1481/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
1482/// or an explicit `Handled<T>` value for more control over handler propagation.
1483pub trait IntoHandled<T> {
1484 /// Convert this value into a `Handled<T>`.
1485 fn into_handled(self) -> Handled<T>;
1486}
1487
1488impl<T> IntoHandled<T> for () {
1489 fn into_handled(self) -> Handled<T> {
1490 Handled::Yes
1491 }
1492}
1493
1494impl<T> IntoHandled<T> for Handled<T> {
1495 fn into_handled(self) -> Handled<T> {
1496 self
1497 }
1498}
1499
1500/// Trait for types that can provide transport for JSON-RPC messages.
1501///
1502/// Implementations of this trait bridge between the internal protocol channels
1503/// (which carry `jsonrpcmsg::Message`) and the actual I/O mechanism (byte streams,
1504/// in-process channels, network sockets, etc.).
1505///
1506/// The transport layer is responsible only for moving `jsonrpcmsg::Message` in and out.
1507/// It has no knowledge of protocol semantics like request/response correlation, ID assignment,
1508/// or handler dispatch - those are handled by the protocol layer in `JrConnection`.
1509///
1510/// # Example
1511///
1512/// See [`ByteStreams`] for the standard byte stream implementation.
1513
1514/// Connection context for sending messages and spawning tasks.
1515///
1516/// This is the primary handle for interacting with the JSON-RPC connection from
1517/// within handler callbacks. You can use it to:
1518///
1519/// * Send requests and notifications to the other side
1520/// * Spawn concurrent tasks that run alongside the connection
1521/// * Respond to requests (via [`JrRequestCx`] which wraps this)
1522///
1523/// # Cloning
1524///
1525/// `JrConnectionCx` is cheaply cloneable - all clones refer to the same underlying connection.
1526/// This makes it easy to share across async tasks.
1527///
1528/// # Event Loop and Concurrency
1529///
1530/// Handler callbacks run on the event loop, which means the connection cannot process new
1531/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
1532/// expensive or blocking work to concurrent tasks.
1533///
1534/// See the [Event Loop and Concurrency](JrConnection#event-loop-and-concurrency) section
1535/// for more details.
1536#[derive(Clone, Debug)]
1537pub struct JrConnectionCx<Link> {
1538 #[expect(dead_code)]
1539 role: Link,
1540 message_tx: OutgoingMessageTx,
1541 task_tx: TaskTx,
1542 dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Link>>,
1543}
1544
1545impl<Link: JrLink> JrConnectionCx<Link> {
1546 fn new(
1547 message_tx: mpsc::UnboundedSender<OutgoingMessage>,
1548 task_tx: mpsc::UnboundedSender<Task>,
1549 dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Link>>,
1550 ) -> Self {
1551 Self {
1552 role: Link::default(),
1553 message_tx,
1554 task_tx,
1555 dynamic_handler_tx,
1556 }
1557 }
1558
1559 /// Spawns a task that will run so long as the JSON-RPC connection is being served.
1560 ///
1561 /// This is the primary mechanism for offloading expensive work from handler callbacks
1562 /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
1563 /// allowing the server to continue processing messages.
1564 ///
1565 /// # Event Loop
1566 ///
1567 /// Handler callbacks run on the event loop, which cannot process new messages while
1568 /// your handler is running. Use `spawn` for any expensive operations:
1569 ///
1570 /// ```no_run
1571 /// # use sacp_test::*;
1572 /// # async fn example() -> Result<(), sacp::Error> {
1573 /// # let connection = mock_connection();
1574 /// connection.on_receive_request(async |req: ProcessRequest, request_cx, cx| {
1575 /// // Clone cx for the spawned task
1576 /// cx.spawn({
1577 /// let connection_cx = cx.clone();
1578 /// async move {
1579 /// let result = expensive_operation(&req.data).await?;
1580 /// connection_cx.send_notification(ProcessComplete { result })?;
1581 /// Ok(())
1582 /// }
1583 /// })?;
1584 ///
1585 /// // Respond immediately
1586 /// request_cx.respond(ProcessResponse { result: "started".into() })
1587 /// }, sacp::on_receive_request!())
1588 /// # .serve(sacp_test::MockTransport).await?;
1589 /// # Ok(())
1590 /// # }
1591 /// ```
1592 ///
1593 /// # Errors
1594 ///
1595 /// If the spawned task returns an error, the entire server will shut down.
1596 #[track_caller]
1597 pub fn spawn(
1598 &self,
1599 task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
1600 ) -> Result<(), crate::Error> {
1601 let location = std::panic::Location::caller();
1602 let task = task.into_future();
1603 Task::new(location, task).spawn(&self.task_tx)
1604 }
1605
1606 /// Spawn a JSON-RPC connection in the background and return a [`JrConnectionCx`] for sending messages to it.
1607 ///
1608 /// This is useful for creating multiple connections that communicate with each other,
1609 /// such as implementing proxy patterns or connecting to multiple backend services.
1610 ///
1611 /// # Arguments
1612 ///
1613 /// - `connection`: The `JrConnection` to spawn (typically created via `JrConnectionBuilder::connect_to()`)
1614 /// - `serve_future`: A function that drives the connection (usually `|c| Box::pin(c.serve())`)
1615 ///
1616 /// # Returns
1617 ///
1618 /// A `JrConnectionCx` that you can use to send requests and notifications to the spawned connection.
1619 ///
1620 /// # Example: Proxying to a backend connection
1621 ///
1622 /// ```
1623 /// # use sacp::link::UntypedLink;
1624 /// # use sacp::{JrConnectionBuilder, JrConnectionCx};
1625 /// # use sacp_test::*;
1626 /// # async fn example(cx: JrConnectionCx<UntypedLink>) -> Result<(), sacp::Error> {
1627 /// // Set up a backend connection
1628 /// let backend = UntypedLink::builder()
1629 /// .on_receive_request(async |req: MyRequest, request_cx, _cx| {
1630 /// request_cx.respond(MyResponse { status: "ok".into() })
1631 /// }, sacp::on_receive_request!())
1632 /// .connect_to(MockTransport)?;
1633 ///
1634 /// // Spawn it and get a context to send requests to it
1635 /// let backend_cx = cx.spawn_connection(backend, |c| Box::pin(c.serve()))?;
1636 ///
1637 /// // Now you can forward requests to the backend
1638 /// let response = backend_cx.send_request(MyRequest {}).block_task().await?;
1639 /// # Ok(())
1640 /// # }
1641 /// ```
1642 #[track_caller]
1643 pub fn spawn_connection<H: JrMessageHandler, R: JrResponder<H::Link> + 'static>(
1644 &self,
1645 connection: JrConnection<H, R>,
1646 serve_future: impl FnOnce(JrConnection<H, R>) -> BoxFuture<'static, Result<(), crate::Error>>,
1647 ) -> Result<JrConnectionCx<H::Link>, crate::Error> {
1648 let cx = connection.cx.clone();
1649 let future = serve_future(connection);
1650 Task::new(std::panic::Location::caller(), future).spawn(&self.task_tx)?;
1651 Ok(cx)
1652 }
1653
1654 /// Send a request/notification and forward the response appropriately.
1655 ///
1656 /// The request context's response type matches the request's response type,
1657 /// enabling type-safe message forwarding.
1658 pub fn send_proxied_message_to<
1659 Peer: JrPeer,
1660 Req: JrRequest<Response: Send>,
1661 Notif: JrNotification,
1662 >(
1663 &self,
1664 peer: Peer,
1665 message: MessageCx<Req, Notif>,
1666 ) -> Result<(), crate::Error>
1667 where
1668 Link: HasPeer<Peer>,
1669 {
1670 match message {
1671 MessageCx::Request(request, request_cx) => self
1672 .send_request_to(peer, request)
1673 .forward_to_request_cx(request_cx),
1674 MessageCx::Notification(notification) => self.send_notification_to(peer, notification),
1675 }
1676 }
1677
1678 /// Send an outgoing request and return a [`JrResponse`] for handling the reply.
1679 ///
1680 /// The returned [`JrResponse`] provides methods for receiving the response without
1681 /// blocking the event loop:
1682 ///
1683 /// * [`on_receiving_result`](JrResponse::on_receiving_result) - Schedule
1684 /// a callback to run when the response arrives (doesn't block the event loop)
1685 /// * [`block_task`](JrResponse::block_task) - Block the current task until the response
1686 /// arrives (only safe in spawned tasks, not in handlers)
1687 ///
1688 /// # Anti-Footgun Design
1689 ///
1690 /// The API intentionally makes it difficult to block on the result directly to prevent
1691 /// the common mistake of blocking the event loop while waiting for a response:
1692 ///
1693 /// ```compile_fail
1694 /// # use sacp_test::*;
1695 /// # async fn example(cx: sacp::JrConnectionCx<sacp::link::UntypedLink>) -> Result<(), sacp::Error> {
1696 /// // ❌ This doesn't compile - prevents blocking the event loop
1697 /// let response = cx.send_request(MyRequest {}).await?;
1698 /// # Ok(())
1699 /// # }
1700 /// ```
1701 ///
1702 /// ```no_run
1703 /// # use sacp_test::*;
1704 /// # async fn example(cx: sacp::JrConnectionCx<sacp::link::UntypedLink>) -> Result<(), sacp::Error> {
1705 /// // ✅ Option 1: Schedule callback (safe in handlers)
1706 /// cx.send_request(MyRequest {})
1707 /// .on_receiving_result(async |result| {
1708 /// // Handle the response
1709 /// Ok(())
1710 /// })?;
1711 ///
1712 /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
1713 /// cx.spawn({
1714 /// let cx = cx.clone();
1715 /// async move {
1716 /// let response = cx.send_request(MyRequest {})
1717 /// .block_task()
1718 /// .await?;
1719 /// // Process response...
1720 /// Ok(())
1721 /// }
1722 /// })?;
1723 /// # Ok(())
1724 /// # }
1725 /// ```
1726 /// Send an outgoing request to the default counterpart peer.
1727 ///
1728 /// This is a convenience method that uses the connection's default peer.
1729 /// For explicit control over the target peer, use [`send_request_to`](Self::send_request_to).
1730 pub fn send_request<Req: JrRequest>(&self, request: Req) -> JrResponse<Req::Response>
1731 where
1732 Link: HasDefaultPeer,
1733 {
1734 self.send_request_to(Link::DefaultPeer::default(), request)
1735 }
1736
1737 /// Send an outgoing request to a specific counterpart peer.
1738 ///
1739 /// The message will be transformed according to the link's [`HasPeer`](crate::HasPeer)
1740 /// implementation before being sent.
1741 pub fn send_request_to<Peer: JrPeer, Req: JrRequest>(
1742 &self,
1743 peer: Peer,
1744 request: Req,
1745 ) -> JrResponse<Req::Response>
1746 where
1747 Link: HasPeer<Peer>,
1748 {
1749 let method = request.method().to_string();
1750 let (response_tx, response_rx) = oneshot::channel();
1751 match Link::remote_style(peer).transform_outgoing_message(request) {
1752 Ok(untyped) => {
1753 // Transform the message for the target role
1754 let params = crate::util::json_cast(untyped.params).ok();
1755 let message = OutgoingMessage::Request {
1756 method: untyped.method.clone(),
1757 params,
1758 response_tx,
1759 };
1760
1761 match self.message_tx.unbounded_send(message) {
1762 Ok(()) => (),
1763 Err(error) => {
1764 let OutgoingMessage::Request {
1765 method,
1766 response_tx,
1767 ..
1768 } = error.into_inner()
1769 else {
1770 unreachable!();
1771 };
1772
1773 response_tx
1774 .send(ResponsePayload {
1775 result: Err(crate::util::internal_error(format!(
1776 "failed to send outgoing request `{method}"
1777 ))),
1778 ack_tx: None,
1779 })
1780 .unwrap();
1781 }
1782 }
1783 }
1784
1785 Err(err) => {
1786 response_tx
1787 .send(ResponsePayload {
1788 result: Err(crate::util::internal_error(format!(
1789 "failed to create untyped request for `{method}`: {err}"
1790 ))),
1791 ack_tx: None,
1792 })
1793 .unwrap();
1794 }
1795 }
1796
1797 JrResponse::new(method.clone(), self.task_tx.clone(), response_rx)
1798 .map(move |json| <Req::Response>::from_value(&method, json))
1799 }
1800
1801 /// Send an outgoing notification to the default counterpart peer (no reply expected).
1802 ///
1803 /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
1804 /// This method sends the notification immediately and returns.
1805 ///
1806 /// This is a convenience method that uses the link's default peer.
1807 /// For explicit control over the target peer, use [`send_notification_to`](Self::send_notification_to).
1808 ///
1809 /// ```no_run
1810 /// # use sacp_test::*;
1811 /// # async fn example(cx: sacp::JrConnectionCx<sacp::link::UntypedLink>) -> Result<(), sacp::Error> {
1812 /// cx.send_notification(StatusUpdate {
1813 /// message: "Processing...".into(),
1814 /// })?;
1815 /// # Ok(())
1816 /// # }
1817 /// ```
1818 pub fn send_notification<N: JrNotification>(&self, notification: N) -> Result<(), crate::Error>
1819 where
1820 Link: HasDefaultPeer,
1821 {
1822 self.send_notification_to(Link::DefaultPeer::default(), notification)
1823 }
1824
1825 /// Send an outgoing notification to a specific counterpart peer (no reply expected).
1826 ///
1827 /// The message will be transformed according to the link's [`HasPeer`](crate::HasPeer)
1828 /// implementation before being sent.
1829 pub fn send_notification_to<Peer: JrPeer, N: JrNotification>(
1830 &self,
1831 peer: Peer,
1832 notification: N,
1833 ) -> Result<(), crate::Error>
1834 where
1835 Link: HasPeer<Peer>,
1836 {
1837 let remote_style = Link::remote_style(peer);
1838 tracing::debug!(
1839 role = std::any::type_name::<Link>(),
1840 peer = std::any::type_name::<Peer>(),
1841 notification_type = std::any::type_name::<N>(),
1842 ?remote_style,
1843 original_method = notification.method(),
1844 "send_notification_to"
1845 );
1846 let transformed = remote_style.transform_outgoing_message(notification)?;
1847 tracing::debug!(
1848 transformed_method = %transformed.method,
1849 "send_notification_to transformed"
1850 );
1851 let params = crate::util::json_cast(transformed.params).ok();
1852 send_raw_message(
1853 &self.message_tx,
1854 OutgoingMessage::Notification {
1855 method: transformed.method,
1856 params,
1857 },
1858 )
1859 }
1860
1861 /// Send an error notification (no reply expected).
1862 pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
1863 send_raw_message(&self.message_tx, OutgoingMessage::Error { error })
1864 }
1865
1866 /// Register a dynamic message handler, used to intercept messages specific to a particular session
1867 /// or some similar modal thing.
1868 ///
1869 /// Dynamic message handlers are called first for every incoming message.
1870 ///
1871 /// If they decline to handle the message, then the message is passed to the regular registered handlers.
1872 ///
1873 /// The handler will stay registered until the returned registration guard is dropped.
1874 pub fn add_dynamic_handler(
1875 &self,
1876 handler: impl JrMessageHandler<Link = Link> + 'static,
1877 ) -> Result<DynamicHandlerRegistration<Link>, crate::Error> {
1878 let uuid = Uuid::new_v4();
1879 self.dynamic_handler_tx
1880 .unbounded_send(DynamicHandlerMessage::AddDynamicHandler(
1881 uuid.clone(),
1882 Box::new(handler),
1883 ))
1884 .map_err(crate::util::internal_error)?;
1885
1886 Ok(DynamicHandlerRegistration::new(uuid, self.clone()))
1887 }
1888
1889 fn remove_dynamic_handler(&self, uuid: Uuid) {
1890 match self
1891 .dynamic_handler_tx
1892 .unbounded_send(DynamicHandlerMessage::RemoveDynamicHandler(uuid))
1893 {
1894 Ok(_) => (),
1895 Err(_) => ( /* ignore errors */),
1896 }
1897 }
1898}
1899
1900#[derive(Clone, Debug)]
1901pub struct DynamicHandlerRegistration<Link: JrLink> {
1902 uuid: Uuid,
1903 cx: JrConnectionCx<Link>,
1904}
1905
1906impl<Link: JrLink> DynamicHandlerRegistration<Link> {
1907 fn new(uuid: Uuid, cx: JrConnectionCx<Link>) -> Self {
1908 Self { uuid, cx }
1909 }
1910
1911 /// Prevents the dynamic handler from being removed when dropped.
1912 pub fn run_indefinitely(self) {
1913 std::mem::forget(self)
1914 }
1915}
1916
1917impl<Link: JrLink> Drop for DynamicHandlerRegistration<Link> {
1918 fn drop(&mut self) {
1919 self.cx.remove_dynamic_handler(self.uuid.clone());
1920 }
1921}
1922
1923/// The context to respond to an incoming request.
1924///
1925/// This context is provided to request handlers and serves a dual role:
1926///
1927/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
1928/// [`respond_with_result`](Self::respond_with_result) to send the response
1929/// 2. **Send other messages** - Use the [`JrConnectionCx`] parameter passed to your
1930/// handler, which provides [`send_request`](`JrConnectionCx::send_request`),
1931/// [`send_notification`](`JrConnectionCx::send_notification`), and
1932/// [`spawn`](`JrConnectionCx::spawn`)
1933///
1934/// # Example
1935///
1936/// ```no_run
1937/// # use sacp_test::*;
1938/// # async fn example() -> Result<(), sacp::Error> {
1939/// # let connection = mock_connection();
1940/// connection.on_receive_request(async |req: ProcessRequest, request_cx, cx| {
1941/// // Send a notification while processing
1942/// cx.send_notification(StatusUpdate {
1943/// message: "processing".into(),
1944/// })?;
1945///
1946/// // Do some work...
1947/// let result = process(&req.data)?;
1948///
1949/// // Respond to the request
1950/// request_cx.respond(ProcessResponse { result })
1951/// }, sacp::on_receive_request!())
1952/// # .serve(sacp_test::MockTransport).await?;
1953/// # Ok(())
1954/// # }
1955/// ```
1956///
1957/// # Event Loop Considerations
1958///
1959/// Like all handlers, request handlers run on the event loop. Use
1960/// [`spawn`](JrConnectionCx::spawn) for expensive operations to avoid blocking
1961/// the connection.
1962///
1963/// See the [Event Loop and Concurrency](JrConnection#event-loop-and-concurrency)
1964/// section for more details.
1965#[must_use]
1966pub struct JrRequestCx<T: JrResponsePayload = serde_json::Value> {
1967 /// The context to use to send outgoing messages and replies.
1968 message_tx: OutgoingMessageTx,
1969 /// The method of the request.
1970 method: String,
1971
1972 /// The `id` of the message we are replying to.
1973 id: jsonrpcmsg::Id,
1974
1975 /// Function to send the response `T` to a request with the given method and id.
1976 make_json: SendBoxFnOnce<
1977 'static,
1978 (String, Result<T, crate::Error>),
1979 Result<serde_json::Value, crate::Error>,
1980 >,
1981}
1982
1983impl<T: JrResponsePayload> std::fmt::Debug for JrRequestCx<T> {
1984 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1985 f.debug_struct("JrRequestCx")
1986 .field("method", &self.method)
1987 .field("id", &self.id)
1988 .field("response_type", &std::any::type_name::<T>())
1989 .finish()
1990 }
1991}
1992
1993impl JrRequestCx<serde_json::Value> {
1994 /// Create a new method context.
1995 fn new(message_tx: OutgoingMessageTx, method: String, id: jsonrpcmsg::Id) -> Self {
1996 Self {
1997 message_tx,
1998 method,
1999 id,
2000 make_json: SendBoxFnOnce::new(move |_method, value| value),
2001 }
2002 }
2003
2004 /// Cast this request context to a different response type
2005 pub fn cast<T: JrResponsePayload>(self) -> JrRequestCx<T> {
2006 self.wrap_params(move |method, value| match value {
2007 Ok(value) => T::into_json(value, &method),
2008 Err(e) => Err(e),
2009 })
2010 }
2011}
2012
2013impl<T: JrResponsePayload> JrRequestCx<T> {
2014 /// Method of the incoming request
2015 pub fn method(&self) -> &str {
2016 &self.method
2017 }
2018
2019 /// ID of the incoming request as a JSON value
2020 pub fn id(&self) -> serde_json::Value {
2021 match &self.id {
2022 jsonrpcmsg::Id::Number(n) => serde_json::Value::Number((*n).into()),
2023 jsonrpcmsg::Id::String(s) => serde_json::Value::String(s.clone()),
2024 jsonrpcmsg::Id::Null => serde_json::Value::Null,
2025 }
2026 }
2027
2028 /// Convert to a `JrRequestCx` that expects a JSON value
2029 /// and which checks (dynamically) that the JSON value it receives
2030 /// can be converted to `T`.
2031 pub fn erase_to_json(self) -> JrRequestCx<serde_json::Value> {
2032 self.wrap_params(|method, value| T::from_value(&method, value?))
2033 }
2034
2035 /// Return a new JrResponse that expects a response of type U and serializes it.
2036 pub fn wrap_method(self, method: String) -> JrRequestCx<T> {
2037 JrRequestCx {
2038 message_tx: self.message_tx,
2039 method,
2040 id: self.id,
2041 make_json: self.make_json,
2042 }
2043 }
2044
2045 /// Return a new JrResponse that expects a response of type U and serializes it.
2046 ///
2047 /// `wrap_fn` will be invoked with the method name and the result of the wrapped function.
2048 pub fn wrap_params<U: JrResponsePayload>(
2049 self,
2050 wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2051 ) -> JrRequestCx<U> {
2052 JrRequestCx {
2053 message_tx: self.message_tx,
2054 method: self.method,
2055 id: self.id,
2056 make_json: SendBoxFnOnce::new(move |method: String, input: Result<U, crate::Error>| {
2057 let t_value = wrap_fn(&method, input);
2058 self.make_json.call(method, t_value)
2059 }),
2060 }
2061 }
2062
2063 /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
2064 pub fn respond_with_result(
2065 self,
2066 response: Result<T, crate::Error>,
2067 ) -> Result<(), crate::Error> {
2068 tracing::debug!(id = ?self.id, "respond called");
2069 let json = self.make_json.call_tuple((self.method.clone(), response));
2070 send_raw_message(
2071 &self.message_tx,
2072 OutgoingMessage::Response {
2073 id: self.id,
2074 response: json,
2075 },
2076 )
2077 }
2078
2079 /// Respond to the JSON-RPC request with a value.
2080 pub fn respond(self, response: T) -> Result<(), crate::Error> {
2081 self.respond_with_result(Ok(response))
2082 }
2083
2084 /// Respond to the JSON-RPC request with an internal error containing a message.
2085 pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2086 self.respond_with_error(crate::util::internal_error(message))
2087 }
2088
2089 /// Respond to the JSON-RPC request with an error.
2090 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2091 tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
2092 self.respond_with_result(Err(error))
2093 }
2094}
2095
2096/// Common bounds for any JSON-RPC message.
2097///
2098/// # Derive Macro
2099///
2100/// For simple message types, you can use the `JrRequest` or `JrNotification` derive macros
2101/// which will implement both `JrMessage` and the respective trait. See [`JrRequest`] and
2102/// [`JrNotification`] for examples.
2103pub trait JrMessage: 'static + Debug + Sized + Send + Clone {
2104 /// The method name for the message.
2105 fn method(&self) -> &str;
2106
2107 /// Convert this message into an untyped message.
2108 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
2109
2110 /// Attempt to parse this type from a method name and parameters.
2111 ///
2112 /// Returns:
2113 /// - `None` if this type does not recognize the method name
2114 /// - `Some(Ok(value))` if the method is recognized and deserialization succeeds
2115 /// - `Some(Err(error))` if the method is recognized but deserialization fails
2116 fn parse_message(method: &str, params: &impl Serialize) -> Option<Result<Self, crate::Error>>;
2117}
2118
2119/// Defines the "payload" of a successful response to a JSON-RPC request.
2120///
2121/// # Derive Macro
2122///
2123/// Use `#[derive(JrResponsePayload)]` to automatically implement this trait:
2124///
2125/// ```ignore
2126/// use sacp::JrResponsePayload;
2127/// use serde::{Serialize, Deserialize};
2128///
2129/// #[derive(Debug, Serialize, Deserialize, JrResponsePayload)]
2130/// struct HelloResponse {
2131/// greeting: String,
2132/// }
2133/// ```
2134pub trait JrResponsePayload: 'static + Debug + Sized + Send {
2135 /// Convert this message into a JSON value.
2136 fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
2137
2138 /// Parse a JSON value into the response type.
2139 fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
2140}
2141
2142impl JrResponsePayload for serde_json::Value {
2143 fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
2144 Ok(value)
2145 }
2146
2147 fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
2148 Ok(self)
2149 }
2150}
2151
2152/// A struct that represents a notification (JSON-RPC message that does not expect a response).
2153///
2154/// # Derive Macro
2155///
2156/// Use `#[derive(JrNotification)]` to automatically implement both `JrMessage` and `JrNotification`:
2157///
2158/// ```ignore
2159/// use sacp::JrNotification;
2160/// use serde::{Serialize, Deserialize};
2161///
2162/// #[derive(Debug, Clone, Serialize, Deserialize, JrNotification)]
2163/// #[notification(method = "_ping")]
2164/// struct PingNotification {
2165/// timestamp: u64,
2166/// }
2167/// ```
2168pub trait JrNotification: JrMessage {}
2169
2170/// A struct that represents a request (JSON-RPC message expecting a response).
2171///
2172/// # Derive Macro
2173///
2174/// Use `#[derive(JrRequest)]` to automatically implement both `JrMessage` and `JrRequest`:
2175///
2176/// ```ignore
2177/// use sacp::{JrRequest, JrResponsePayload};
2178/// use serde::{Serialize, Deserialize};
2179///
2180/// #[derive(Debug, Clone, Serialize, Deserialize, JrRequest)]
2181/// #[request(method = "_hello", response = HelloResponse)]
2182/// struct HelloRequest {
2183/// name: String,
2184/// }
2185///
2186/// #[derive(Debug, Serialize, Deserialize, JrResponsePayload)]
2187/// struct HelloResponse {
2188/// greeting: String,
2189/// }
2190/// ```
2191pub trait JrRequest: JrMessage {
2192 /// The type of data expected in response.
2193 type Response: JrResponsePayload;
2194}
2195
2196/// An enum capturing an in-flight request or notification.
2197/// In the case of a request, also includes the context used to respond to the request.
2198///
2199/// Type parameters allow specifying the concrete request and notification types.
2200/// By default, both are `UntypedMessage` for dynamic dispatch.
2201/// The request context's response type matches the request's response type.
2202#[derive(Debug)]
2203pub enum MessageCx<Req: JrRequest = UntypedMessage, Notif: JrMessage = UntypedMessage> {
2204 /// Incoming request and the context where the response should be sent.
2205 Request(Req, JrRequestCx<Req::Response>),
2206
2207 /// Incoming notification.
2208 Notification(Notif),
2209}
2210
2211impl<Req: JrRequest, Notif: JrMessage> MessageCx<Req, Notif> {
2212 /// Map the request and notification types to new types.
2213 pub fn map<Req1, Notif1>(
2214 self,
2215 map_request: impl FnOnce(Req, JrRequestCx<Req::Response>) -> (Req1, JrRequestCx<Req1::Response>),
2216 map_notification: impl FnOnce(Notif) -> Notif1,
2217 ) -> MessageCx<Req1, Notif1>
2218 where
2219 Req1: JrRequest<Response: Send>,
2220 Notif1: JrMessage,
2221 {
2222 match self {
2223 MessageCx::Request(request, cx) => {
2224 let (new_request, new_cx) = map_request(request, cx);
2225 MessageCx::Request(new_request, new_cx)
2226 }
2227 MessageCx::Notification(notification) => {
2228 let new_notification = map_notification(notification);
2229 MessageCx::Notification(new_notification)
2230 }
2231 }
2232 }
2233
2234 /// Respond to the message with an error.
2235 ///
2236 /// If this message is a request, this error becomes the reply to the request.
2237 ///
2238 /// If this message is a notification, the error is sent as a notification.
2239 pub fn respond_with_error<Link: JrLink>(
2240 self,
2241 error: crate::Error,
2242 cx: JrConnectionCx<Link>,
2243 ) -> Result<(), crate::Error> {
2244 match self {
2245 MessageCx::Request(_, request_cx) => request_cx.respond_with_error(error),
2246 MessageCx::Notification(_) => cx.send_error_notification(error),
2247 }
2248 }
2249
2250 /// Convert to a `JrRequestCx` that expects a JSON value
2251 /// and which checks (dynamically) that the JSON value it receives
2252 /// can be converted to `T`.
2253 pub fn erase_to_json(self) -> Result<MessageCx, crate::Error> {
2254 match self {
2255 MessageCx::Request(response, request_cx) => Ok(MessageCx::Request(
2256 response.to_untyped_message()?,
2257 request_cx.erase_to_json(),
2258 )),
2259 MessageCx::Notification(notification) => {
2260 Ok(MessageCx::Notification(notification.to_untyped_message()?))
2261 }
2262 }
2263 }
2264
2265 /// Convert the message in self to an untyped message.
2266 pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2267 match self {
2268 MessageCx::Request(request, _) => request.to_untyped_message(),
2269 MessageCx::Notification(notification) => notification.to_untyped_message(),
2270 }
2271 }
2272
2273 /// Convert self to an untyped message context.
2274 pub fn into_untyped_message_cx(self) -> Result<MessageCx, crate::Error> {
2275 match self {
2276 MessageCx::Request(request, request_cx) => Ok(MessageCx::Request(
2277 request.to_untyped_message()?,
2278 request_cx.erase_to_json(),
2279 )),
2280 MessageCx::Notification(notification) => {
2281 Ok(MessageCx::Notification(notification.to_untyped_message()?))
2282 }
2283 }
2284 }
2285
2286 /// Returns the request ID if this is a request, None if notification.
2287 pub fn id(&self) -> Option<serde_json::Value> {
2288 match self {
2289 MessageCx::Request(_, cx) => Some(cx.id()),
2290 MessageCx::Notification(_) => None,
2291 }
2292 }
2293
2294 /// Returns the method of the message (only available for UntypedMessage).
2295 pub fn method(&self) -> &str {
2296 match self {
2297 MessageCx::Request(msg, _) => msg.method(),
2298 MessageCx::Notification(msg) => msg.method(),
2299 }
2300 }
2301}
2302
2303impl MessageCx {
2304 /// Attempts to parse `self` into a typed message context.
2305 ///
2306 /// # Returns
2307 ///
2308 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2309 /// * `Ok(Err(self))` if not
2310 /// * `Err` if has the correct method for the given types but parsing fails
2311 pub(crate) fn into_typed_message_cx<Req: JrRequest, Notif: JrNotification>(
2312 self,
2313 ) -> Result<Result<MessageCx<Req, Notif>, MessageCx>, crate::Error> {
2314 match self {
2315 MessageCx::Request(message, request_cx) => {
2316 tracing::debug!(
2317 request_type = std::any::type_name::<Req>(),
2318 message = ?message,
2319 "MessageHandler::handle_request"
2320 );
2321 match Req::parse_message(&message.method, &message.params) {
2322 Some(Ok(req)) => {
2323 tracing::trace!(?req, "MessageHandler::handle_request: parse completed");
2324 Ok(Ok(MessageCx::Request(req, request_cx.cast())))
2325 }
2326 Some(Err(err)) => {
2327 tracing::trace!(?err, "MessageHandler::handle_request: parse errored");
2328 return Err(err);
2329 }
2330 None => {
2331 tracing::trace!("MessageHandler::handle_request: parse failed");
2332 Ok(Err(MessageCx::Request(message, request_cx)))
2333 }
2334 }
2335 }
2336
2337 MessageCx::Notification(message) => {
2338 tracing::debug!(
2339 notification_type = std::any::type_name::<Notif>(),
2340 message = ?message,
2341 "MessageHandler::handle_notification"
2342 );
2343 match Notif::parse_message(&message.method, &message.params) {
2344 Some(Ok(notif)) => {
2345 tracing::trace!(
2346 ?notif,
2347 "MessageHandler::handle_notification: parse completed"
2348 );
2349 Ok(Ok(MessageCx::Notification(notif)))
2350 }
2351 Some(Err(err)) => {
2352 tracing::trace!(?err, "MessageHandler: parse errored");
2353 Err(err)
2354 }
2355 None => {
2356 tracing::trace!("MessageHandler: parse failed");
2357 Ok(Err(MessageCx::Notification(message)))
2358 }
2359 }
2360 }
2361 }
2362 }
2363
2364 /// True if this message has a session-id field
2365 pub fn has_field(&self, field_name: &str) -> bool {
2366 self.message().params().get(field_name).is_some()
2367 }
2368
2369 /// Extract the ACP session-id from this message (if any).
2370 pub(crate) fn has_session_id(&self) -> bool {
2371 self.has_field("sessionId")
2372 }
2373
2374 /// Extract the ACP session-id from this message (if any).
2375 pub(crate) fn get_session_id(&self) -> Result<Option<SessionId>, crate::Error> {
2376 let value = match self.message().params().get("sessionId") {
2377 Some(value) => value,
2378 None => return Ok(None),
2379 };
2380 let session_id = serde_json::from_value(value.clone())?;
2381 Ok(Some(session_id))
2382 }
2383
2384 /// Try to parse this as a notification of the given type.
2385 ///
2386 /// # Returns
2387 ///
2388 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2389 /// * `Ok(Err(self))` if not
2390 /// * `Err` if has the correct method for the given types but parsing fails
2391 pub fn into_notification<N: JrNotification>(
2392 self,
2393 ) -> Result<Result<N, MessageCx>, crate::Error> {
2394 match self {
2395 MessageCx::Request(..) => Ok(Err(self)),
2396 MessageCx::Notification(msg) => match N::parse_message(&msg.method, &msg.params) {
2397 Some(Ok(n)) => Ok(Ok(n)),
2398 Some(Err(err)) => Err(err),
2399 None => Ok(Err(MessageCx::Notification(msg))),
2400 },
2401 }
2402 }
2403
2404 /// Try to parse this as a request of the given type.
2405 ///
2406 /// # Returns
2407 ///
2408 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2409 /// * `Ok(Err(self))` if not
2410 /// * `Err` if has the correct method for the given types but parsing fails
2411 pub fn into_request<Req: JrRequest>(
2412 self,
2413 ) -> Result<Result<(Req, JrRequestCx<Req::Response>), MessageCx>, crate::Error> {
2414 match self {
2415 MessageCx::Request(msg, request_cx) => {
2416 match Req::parse_message(&msg.method, &msg.params) {
2417 Some(Ok(req)) => Ok(Ok((req, request_cx.cast()))),
2418 Some(Err(err)) => Err(err),
2419 None => Ok(Err(MessageCx::Request(msg, request_cx))),
2420 }
2421 }
2422 MessageCx::Notification(..) => Ok(Err(self)),
2423 }
2424 }
2425}
2426
2427impl<M: JrRequest + JrNotification> MessageCx<M, M> {
2428 /// Returns the message of the message (only available for UntypedMessage).
2429 pub fn message(&self) -> &M {
2430 match self {
2431 MessageCx::Request(msg, _) => msg,
2432 MessageCx::Notification(msg) => msg,
2433 }
2434 }
2435
2436 /// Map the request/notification message.
2437 pub(crate) fn try_map_message(
2438 self,
2439 map_message: impl FnOnce(M) -> Result<M, crate::Error>,
2440 ) -> Result<MessageCx<M, M>, crate::Error> {
2441 match self {
2442 MessageCx::Request(request, cx) => Ok(MessageCx::Request(map_message(request)?, cx)),
2443 MessageCx::Notification(notification) => {
2444 Ok(MessageCx::<M, M>::Notification(map_message(notification)?))
2445 }
2446 }
2447 }
2448}
2449
2450/// An incoming JSON message without any typing. Can be a request or a notification.
2451#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
2452pub struct UntypedMessage {
2453 /// The JSON-RPC method name
2454 pub method: String,
2455 /// The JSON-RPC parameters as a raw JSON value
2456 pub params: serde_json::Value,
2457}
2458
2459impl UntypedMessage {
2460 /// Returns an untyped message with the given method and parameters.
2461 pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
2462 let params = serde_json::to_value(params)?;
2463 Ok(Self {
2464 method: method.to_string(),
2465 params,
2466 })
2467 }
2468
2469 /// Returns the method name
2470 pub fn method(&self) -> &str {
2471 &self.method
2472 }
2473
2474 /// Returns the parameters as a JSON value
2475 pub fn params(&self) -> &serde_json::Value {
2476 &self.params
2477 }
2478
2479 /// Consumes this message and returns the method and params
2480 pub fn into_parts(self) -> (String, serde_json::Value) {
2481 (self.method, self.params)
2482 }
2483}
2484
2485impl JrMessage for UntypedMessage {
2486 fn method(&self) -> &str {
2487 &self.method
2488 }
2489
2490 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2491 Ok(self.clone())
2492 }
2493
2494 fn parse_message(method: &str, params: &impl Serialize) -> Option<Result<Self, crate::Error>> {
2495 Some(UntypedMessage::new(method, params))
2496 }
2497}
2498
2499impl JrRequest for UntypedMessage {
2500 type Response = serde_json::Value;
2501}
2502
2503impl JrNotification for UntypedMessage {}
2504
2505/// Represents a pending response of type `R` from an outgoing request.
2506///
2507/// Returned by [`JrConnectionCx::send_request`], this type provides methods for handling
2508/// the response without blocking the event loop. The API is intentionally designed to make
2509/// it difficult to accidentally block.
2510///
2511/// # Anti-Footgun Design
2512///
2513/// You cannot directly `.await` a `JrResponse`. Instead, you must choose how to handle
2514/// the response:
2515///
2516/// ## Option 1: Schedule a Callback (Safe in Handlers)
2517///
2518/// Use [`on_receiving_result`](Self::on_receiving_result) to schedule a task
2519/// that runs when the response arrives. This doesn't block the event loop:
2520///
2521/// ```no_run
2522/// # use sacp_test::*;
2523/// # async fn example(cx: sacp::JrConnectionCx<sacp::link::UntypedLink>) -> Result<(), sacp::Error> {
2524/// cx.send_request(MyRequest {})
2525/// .on_receiving_result(async |result| {
2526/// match result {
2527/// Ok(response) => {
2528/// // Handle successful response
2529/// Ok(())
2530/// }
2531/// Err(error) => {
2532/// // Handle error
2533/// Err(error)
2534/// }
2535/// }
2536/// })?;
2537/// # Ok(())
2538/// # }
2539/// ```
2540///
2541/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
2542///
2543/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
2544/// in a spawned task (never in a handler):
2545///
2546/// ```no_run
2547/// # use sacp_test::*;
2548/// # async fn example(cx: sacp::JrConnectionCx<sacp::link::UntypedLink>) -> Result<(), sacp::Error> {
2549/// // ✅ Safe: Spawned task runs concurrently
2550/// cx.spawn({
2551/// let cx = cx.clone();
2552/// async move {
2553/// let response = cx.send_request(MyRequest {})
2554/// .block_task()
2555/// .await?;
2556/// // Process response...
2557/// Ok(())
2558/// }
2559/// })?;
2560/// # Ok(())
2561/// # }
2562/// ```
2563///
2564/// ```no_run
2565/// # use sacp_test::*;
2566/// # async fn example() -> Result<(), sacp::Error> {
2567/// # let connection = mock_connection();
2568/// // ❌ NEVER do this in a handler - blocks the event loop!
2569/// connection.on_receive_request(async |req: MyRequest, request_cx, cx| {
2570/// let response = cx.send_request(MyRequest {})
2571/// .block_task() // This will deadlock!
2572/// .await?;
2573/// request_cx.respond(response)
2574/// }, sacp::on_receive_request!())
2575/// # .serve(sacp_test::MockTransport).await?;
2576/// # Ok(())
2577/// # }
2578/// ```
2579///
2580/// # Why This Design?
2581///
2582/// If you block the event loop while waiting for a response, the connection cannot process
2583/// the incoming response message, creating a deadlock. This API design prevents that footgun
2584/// by making blocking explicit and encouraging non-blocking patterns.
2585pub struct JrResponse<T> {
2586 method: String,
2587 task_tx: TaskTx,
2588 response_rx: oneshot::Receiver<ResponsePayload>,
2589 to_result: Box<dyn Fn(serde_json::Value) -> Result<T, crate::Error> + Send>,
2590}
2591
2592impl JrResponse<serde_json::Value> {
2593 fn new(
2594 method: String,
2595 task_tx: mpsc::UnboundedSender<Task>,
2596 response_rx: oneshot::Receiver<ResponsePayload>,
2597 ) -> Self {
2598 Self {
2599 method,
2600 response_rx,
2601 task_tx,
2602 to_result: Box::new(Ok),
2603 }
2604 }
2605}
2606
2607impl<T: JrResponsePayload> JrResponse<T> {
2608 /// The method of the request this is in response to.
2609 pub fn method(&self) -> &str {
2610 &self.method
2611 }
2612
2613 /// Create a new response that maps the result of the response to a new type.
2614 pub fn map<U>(
2615 self,
2616 map_fn: impl Fn(T) -> Result<U, crate::Error> + 'static + Send,
2617 ) -> JrResponse<U> {
2618 JrResponse {
2619 method: self.method,
2620 response_rx: self.response_rx,
2621 task_tx: self.task_tx,
2622 to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
2623 }
2624 }
2625
2626 /// Forward the response (success or error) to a request context when it arrives.
2627 ///
2628 /// This is a convenience method for proxying messages between connections. When the
2629 /// response arrives, it will be automatically sent to the provided request context,
2630 /// whether it's a successful response or an error.
2631 ///
2632 /// # Example: Proxying requests
2633 ///
2634 /// ```
2635 /// # use sacp::link::UntypedLink;
2636 /// # use sacp::{JrConnectionBuilder, JrConnectionCx};
2637 /// # use sacp_test::*;
2638 /// # async fn example(cx: JrConnectionCx<UntypedLink>) -> Result<(), sacp::Error> {
2639 /// // Set up backend connection
2640 /// let backend = UntypedLink::builder()
2641 /// .on_receive_request(async |req: MyRequest, request_cx, cx| {
2642 /// request_cx.respond(MyResponse { status: "ok".into() })
2643 /// }, sacp::on_receive_request!())
2644 /// .connect_to(MockTransport)?;
2645 ///
2646 /// // Spawn backend and get a context to send to it
2647 /// let backend_cx = cx.spawn_connection(backend, |c| Box::pin(c.serve()))?;
2648 ///
2649 /// // Set up proxy that forwards requests to backend
2650 /// UntypedLink::builder()
2651 /// .on_receive_request({
2652 /// let backend_cx = backend_cx.clone();
2653 /// async move |req: MyRequest, request_cx, cx| {
2654 /// // Forward the request to backend and proxy the response back
2655 /// backend_cx.send_request(req)
2656 /// .forward_to_request_cx(request_cx)?;
2657 /// Ok(())
2658 /// }
2659 /// }, sacp::on_receive_request!());
2660 /// # Ok(())
2661 /// # }
2662 /// ```
2663 ///
2664 /// # Type Safety
2665 ///
2666 /// The request context's response type must match the request's response type,
2667 /// ensuring type-safe message forwarding.
2668 ///
2669 /// # When to Use
2670 ///
2671 /// Use this when:
2672 /// - You're implementing a proxy or gateway pattern
2673 /// - You want to forward responses without processing them
2674 /// - The response types match between the outgoing request and incoming request
2675 ///
2676 /// This is equivalent to calling `on_receiving_result` and manually forwarding
2677 /// the result, but more concise.
2678 pub fn forward_to_request_cx(self, request_cx: JrRequestCx<T>) -> Result<(), crate::Error>
2679 where
2680 T: Send,
2681 {
2682 self.on_receiving_result(async move |result| request_cx.respond_with_result(result))
2683 }
2684
2685 /// Block the current task until the response is received.
2686 ///
2687 /// **Warning:** This method blocks the current async task. It is **only safe** to use
2688 /// in spawned tasks created with [`JrConnectionCx::spawn`]. Using it directly in a
2689 /// handler callback will deadlock the connection.
2690 ///
2691 /// # Safe Usage (in spawned tasks)
2692 ///
2693 /// ```no_run
2694 /// # use sacp_test::*;
2695 /// # async fn example() -> Result<(), sacp::Error> {
2696 /// # let connection = mock_connection();
2697 /// connection.on_receive_request(async |req: MyRequest, request_cx, cx| {
2698 /// // Spawn a task to handle the request
2699 /// cx.spawn({
2700 /// let connection_cx = cx.clone();
2701 /// async move {
2702 /// // Safe: We're in a spawned task, not blocking the event loop
2703 /// let response = connection_cx.send_request(OtherRequest {})
2704 /// .block_task()
2705 /// .await?;
2706 ///
2707 /// // Process the response...
2708 /// Ok(())
2709 /// }
2710 /// })?;
2711 ///
2712 /// // Respond immediately
2713 /// request_cx.respond(MyResponse { status: "ok".into() })
2714 /// }, sacp::on_receive_request!())
2715 /// # .serve(sacp_test::MockTransport).await?;
2716 /// # Ok(())
2717 /// # }
2718 /// ```
2719 ///
2720 /// # Unsafe Usage (in handlers - will deadlock!)
2721 ///
2722 /// ```no_run
2723 /// # use sacp_test::*;
2724 /// # async fn example() -> Result<(), sacp::Error> {
2725 /// # let connection = mock_connection();
2726 /// connection.on_receive_request(async |req: MyRequest, request_cx, cx| {
2727 /// // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
2728 /// let response = cx.send_request(OtherRequest {})
2729 /// .block_task()
2730 /// .await?;
2731 ///
2732 /// request_cx.respond(MyResponse { status: response.value })
2733 /// }, sacp::on_receive_request!())
2734 /// # .serve(sacp_test::MockTransport).await?;
2735 /// # Ok(())
2736 /// # }
2737 /// ```
2738 ///
2739 /// # When to Use
2740 ///
2741 /// Use this method when:
2742 /// - You're in a spawned task (via [`JrConnectionCx::spawn`])
2743 /// - You need the response value to proceed with your logic
2744 /// - Linear control flow is more natural than callbacks
2745 ///
2746 /// For handler callbacks, use [`on_receiving_result`](Self::on_receiving_result) instead.
2747 pub async fn block_task(self) -> Result<T, crate::Error>
2748 where
2749 T: Send,
2750 {
2751 match self.response_rx.await {
2752 Ok(ResponsePayload {
2753 result: Ok(json_value),
2754 ack_tx,
2755 }) => {
2756 // Ack immediately - we're in a spawned task, so the dispatch loop
2757 // can continue while we process the value.
2758 if let Some(tx) = ack_tx {
2759 let _ = tx.send(());
2760 }
2761 match (self.to_result)(json_value) {
2762 Ok(value) => Ok(value),
2763 Err(err) => Err(err),
2764 }
2765 }
2766 Ok(ResponsePayload {
2767 result: Err(err),
2768 ack_tx,
2769 }) => {
2770 if let Some(tx) = ack_tx {
2771 let _ = tx.send(());
2772 }
2773 Err(err)
2774 }
2775 Err(err) => Err(crate::util::internal_error(format!(
2776 "response to `{}` never received: {}",
2777 self.method, err
2778 ))),
2779 }
2780 }
2781
2782 /// Schedule an async task to run when a successful response is received.
2783 ///
2784 /// This is a convenience wrapper around [`on_receiving_result`](Self::on_receiving_result)
2785 /// for the common pattern of forwarding errors to a request context while only processing
2786 /// successful responses.
2787 ///
2788 /// # Behavior
2789 ///
2790 /// - If the response is `Ok(value)`, your task receives the value and the request context
2791 /// - If the response is `Err(error)`, the error is automatically sent to `request_cx`
2792 /// and your task is not called
2793 ///
2794 /// # Example: Chaining requests
2795 ///
2796 /// ```no_run
2797 /// # use sacp_test::*;
2798 /// # async fn example() -> Result<(), sacp::Error> {
2799 /// # let connection = mock_connection();
2800 /// connection.on_receive_request(async |req: ValidateRequest, request_cx, cx| {
2801 /// // Send initial request
2802 /// cx.send_request(ValidateRequest { data: req.data.clone() })
2803 /// .on_receiving_ok_result(request_cx, async |validation, request_cx| {
2804 /// // Only runs if validation succeeded
2805 /// if validation.is_valid {
2806 /// // Respond to original request
2807 /// request_cx.respond(ValidateResponse { is_valid: true, error: None })
2808 /// } else {
2809 /// request_cx.respond_with_error(sacp::util::internal_error("validation failed"))
2810 /// }
2811 /// })?;
2812 ///
2813 /// Ok(())
2814 /// }, sacp::on_receive_request!())
2815 /// # .serve(sacp_test::MockTransport).await?;
2816 /// # Ok(())
2817 /// # }
2818 /// ```
2819 ///
2820 /// # Ordering
2821 ///
2822 /// Like [`on_receiving_result`](Self::on_receiving_result), the callback blocks the
2823 /// dispatch loop until it completes. See the [`ordering`](crate::concepts::ordering) module
2824 /// for details.
2825 ///
2826 /// # When to Use
2827 ///
2828 /// Use this when:
2829 /// - You need to respond to a request based on another request's result
2830 /// - You want errors to automatically propagate to the request context
2831 /// - You only care about the success case
2832 ///
2833 /// For more control over error handling, use [`on_receiving_result`](Self::on_receiving_result).
2834 #[track_caller]
2835 pub fn on_receiving_ok_result<F>(
2836 self,
2837 request_cx: JrRequestCx<T>,
2838 task: impl FnOnce(T, JrRequestCx<T>) -> F + 'static + Send,
2839 ) -> Result<(), crate::Error>
2840 where
2841 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
2842 T: Send,
2843 {
2844 self.on_receiving_result(async move |result| match result {
2845 Ok(value) => task(value, request_cx).await,
2846 Err(err) => request_cx.respond_with_error(err),
2847 })
2848 }
2849
2850 /// Schedule an async task to run when the response is received.
2851 ///
2852 /// This is the recommended way to handle responses in handler callbacks, as it doesn't
2853 /// block the event loop. The task will be spawned automatically when the response arrives.
2854 ///
2855 /// # Example: Handle response in callback
2856 ///
2857 /// ```no_run
2858 /// # use sacp_test::*;
2859 /// # async fn example() -> Result<(), sacp::Error> {
2860 /// # let connection = mock_connection();
2861 /// connection.on_receive_request(async |req: MyRequest, request_cx, cx| {
2862 /// // Send a request and schedule a callback for the response
2863 /// cx.send_request(QueryRequest { id: 22 })
2864 /// .on_receiving_result({
2865 /// let connection_cx = cx.clone();
2866 /// async move |result| {
2867 /// match result {
2868 /// Ok(response) => {
2869 /// println!("Got response: {:?}", response);
2870 /// // Can send more messages here
2871 /// connection_cx.send_notification(QueryComplete {})?;
2872 /// Ok(())
2873 /// }
2874 /// Err(error) => {
2875 /// eprintln!("Request failed: {}", error);
2876 /// Err(error)
2877 /// }
2878 /// }
2879 /// }
2880 /// })?;
2881 ///
2882 /// // Handler continues immediately without waiting
2883 /// request_cx.respond(MyResponse { status: "processing".into() })
2884 /// }, sacp::on_receive_request!())
2885 /// # .serve(sacp_test::MockTransport).await?;
2886 /// # Ok(())
2887 /// # }
2888 /// ```
2889 ///
2890 /// # Ordering
2891 ///
2892 /// The callback runs as a spawned task, but the dispatch loop waits for it to complete
2893 /// before processing the next message. This gives you ordering guarantees: no other
2894 /// messages will be processed while your callback runs.
2895 ///
2896 /// This differs from [`block_task`](Self::block_task), which signals completion immediately
2897 /// upon receiving the response (before your code processes it).
2898 ///
2899 /// See the [`ordering`](crate::concepts::ordering) module for details on ordering guarantees
2900 /// and how to avoid deadlocks.
2901 ///
2902 /// # Error Handling
2903 ///
2904 /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
2905 /// errors appropriately within your task.
2906 ///
2907 /// # When to Use
2908 ///
2909 /// Use this method when:
2910 /// - You're in a handler callback (not a spawned task)
2911 /// - You want ordering guarantees (no other messages processed during your callback)
2912 /// - You need to do async work before "releasing" control back to the dispatch loop
2913 ///
2914 /// For spawned tasks where you don't need ordering guarantees, consider [`block_task`](Self::block_task).
2915 #[track_caller]
2916 pub fn on_receiving_result<F>(
2917 self,
2918 task: impl FnOnce(Result<T, crate::Error>) -> F + 'static + Send,
2919 ) -> Result<(), crate::Error>
2920 where
2921 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
2922 T: Send,
2923 {
2924 let task_tx = self.task_tx.clone();
2925 let method = self.method;
2926 let response_rx = self.response_rx;
2927 let to_result = self.to_result;
2928 let location = Location::caller();
2929
2930 Task::new(location, async move {
2931 match response_rx.await {
2932 Ok(ResponsePayload { result, ack_tx }) => {
2933 // Convert the result using to_result for Ok values
2934 let typed_result = match result {
2935 Ok(json_value) => to_result(json_value),
2936 Err(err) => Err(err),
2937 };
2938
2939 // Run the user's callback
2940 let outcome = task(typed_result).await;
2941
2942 // Ack AFTER the callback completes - this is the key difference
2943 // from block_task. The dispatch loop waits for this ack.
2944 if let Some(tx) = ack_tx {
2945 let _ = tx.send(());
2946 }
2947
2948 outcome
2949 }
2950 Err(err) => Err(crate::util::internal_error(format!(
2951 "response to `{}` never received: {}",
2952 method, err
2953 ))),
2954 }
2955 })
2956 .spawn(&task_tx)
2957 }
2958}
2959
2960// ============================================================================
2961// IntoJrConnectionTransport Implementations
2962// ============================================================================
2963
2964/// A component that communicates over line streams.
2965///
2966/// `Lines` implements the [`Component`] trait for any pair of line-based streams
2967/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
2968/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
2969///
2970/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
2971/// transformation of individual lines before they are parsed or after they are serialized.
2972/// This is particularly useful for debugging, logging, or implementing custom line-based
2973/// protocols.
2974///
2975/// # Use Cases
2976///
2977/// - **Line-by-line logging**: Intercept and log each line before parsing
2978/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
2979/// - **Debugging**: Inspect raw message strings
2980/// - **Line filtering**: Skip or modify specific messages
2981///
2982/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
2983/// for byte-based I/O.
2984///
2985/// [`Component`]: crate::Component
2986pub struct Lines<OutgoingSink, IncomingStream> {
2987 /// Outgoing line sink (where we write serialized JSON-RPC messages)
2988 pub outgoing: OutgoingSink,
2989 /// Incoming line stream (where we read and parse JSON-RPC messages)
2990 pub incoming: IncomingStream,
2991}
2992
2993impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
2994where
2995 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
2996 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
2997{
2998 /// Create a new line stream transport.
2999 pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
3000 Self { outgoing, incoming }
3001 }
3002}
3003
3004impl<OutgoingSink, IncomingStream, L: JrLink> Component<L> for Lines<OutgoingSink, IncomingStream>
3005where
3006 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3007 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3008{
3009 async fn serve(self, client: impl Component<L::ConnectsTo>) -> Result<(), crate::Error> {
3010 let (channel, serve_self) = Component::<L>::into_server(self);
3011 match futures::future::select(Box::pin(client.serve(channel)), serve_self).await {
3012 Either::Left((result, _)) => result,
3013 Either::Right((result, _)) => result,
3014 }
3015 }
3016
3017 fn into_server(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3018 let Self { outgoing, incoming } = self;
3019
3020 // Create a channel pair for the client to use
3021 let (channel_for_caller, channel_for_lines) = Channel::duplex();
3022
3023 // Create the server future that runs the line stream actors
3024 let server_future = Box::pin(async move {
3025 let Channel { rx, tx } = channel_for_lines;
3026
3027 // Run both actors concurrently
3028 let outgoing_future = transport_actor::transport_outgoing_lines_actor(rx, outgoing);
3029 let incoming_future = transport_actor::transport_incoming_lines_actor(incoming, tx);
3030
3031 // Wait for both to complete
3032 futures::try_join!(outgoing_future, incoming_future)?;
3033
3034 Ok(())
3035 });
3036
3037 (channel_for_caller, server_future)
3038 }
3039}
3040
3041/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
3042///
3043/// `ByteStreams` implements the [`Component`] trait for any pair of `AsyncRead` and `AsyncWrite`
3044/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3045/// This is the standard way to communicate with external processes or network connections.
3046///
3047/// # Use Cases
3048///
3049/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
3050/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
3051/// - **Named pipes**: Cross-process communication on the same machine
3052/// - **File I/O**: Reading from and writing to file descriptors
3053///
3054/// # Example
3055///
3056/// Connecting to an agent via stdio:
3057///
3058/// ```no_run
3059/// use sacp::link::UntypedLink;
3060/// # use sacp::{ByteStreams};
3061/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
3062///
3063/// # async fn example() -> Result<(), sacp::Error> {
3064/// let component = ByteStreams::new(
3065/// tokio::io::stdout().compat_write(),
3066/// tokio::io::stdin().compat(),
3067/// );
3068///
3069/// // Use as a component in a connection
3070/// sacp::link::UntypedLink::builder()
3071/// .name("my-client")
3072/// .serve(component)
3073/// .await?;
3074/// # Ok(())
3075/// # }
3076/// ```
3077///
3078/// [`Component`]: crate::Component
3079pub struct ByteStreams<OB, IB> {
3080 /// Outgoing byte stream (where we write serialized messages)
3081 pub outgoing: OB,
3082 /// Incoming byte stream (where we read and parse messages)
3083 pub incoming: IB,
3084}
3085
3086impl<OB, IB> ByteStreams<OB, IB>
3087where
3088 OB: AsyncWrite + Send + 'static,
3089 IB: AsyncRead + Send + 'static,
3090{
3091 /// Create a new byte stream transport.
3092 pub fn new(outgoing: OB, incoming: IB) -> Self {
3093 Self { outgoing, incoming }
3094 }
3095}
3096
3097impl<OB, IB, L: JrLink> Component<L> for ByteStreams<OB, IB>
3098where
3099 OB: AsyncWrite + Send + 'static,
3100 IB: AsyncRead + Send + 'static,
3101{
3102 async fn serve(self, client: impl Component<L::ConnectsTo>) -> Result<(), crate::Error> {
3103 let (channel, serve_self) = Component::<L>::into_server(self);
3104 match futures::future::select(pin!(client.serve(channel)), serve_self).await {
3105 Either::Left((result, _)) => result,
3106 Either::Right((result, _)) => result,
3107 }
3108 }
3109
3110 fn into_server(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3111 use futures::AsyncBufReadExt;
3112 use futures::AsyncWriteExt;
3113 use futures::io::BufReader;
3114 let Self { outgoing, incoming } = self;
3115
3116 // Convert byte streams to line streams
3117 // Box both streams to satisfy Unpin requirements
3118 let incoming_lines = Box::pin(BufReader::new(incoming).lines());
3119
3120 // Create a sink that writes lines (with newlines) to the outgoing byte stream
3121 // We need to Box the writer since it may not be Unpin
3122 let outgoing_sink =
3123 futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
3124 let mut bytes = line.into_bytes();
3125 bytes.push(b'\n');
3126 writer.write_all(&bytes).await?;
3127 Ok::<_, std::io::Error>(writer)
3128 });
3129
3130 // Delegate to Lines component
3131 Component::<L>::into_server(Lines::new(outgoing_sink, incoming_lines))
3132 }
3133}
3134
3135/// A channel endpoint representing one side of a bidirectional message channel.
3136///
3137/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
3138/// Each endpoint has:
3139/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
3140/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
3141///
3142/// # Example
3143///
3144/// ```no_run
3145/// # use sacp::link::UntypedLink;
3146/// # use sacp::{Channel, JrConnectionBuilder};
3147/// # async fn example() -> Result<(), sacp::Error> {
3148/// // Create a pair of connected channels
3149/// let (channel_a, channel_b) = Channel::duplex();
3150///
3151/// // Each channel can be used by a different component
3152/// UntypedLink::builder()
3153/// .name("connection-a")
3154/// .serve(channel_a)
3155/// .await?;
3156/// # Ok(())
3157/// # }
3158/// ```
3159pub struct Channel {
3160 /// Receives messages (or errors) from the counterpart.
3161 pub rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
3162 /// Sends messages (or errors) to the counterpart.
3163 pub tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
3164}
3165
3166impl Channel {
3167 /// Create a pair of connected channel endpoints.
3168 ///
3169 /// Returns two `Channel` instances that are connected to each other:
3170 /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
3171 /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
3172 ///
3173 /// # Returns
3174 ///
3175 /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
3176 pub fn duplex() -> (Self, Self) {
3177 // Create channels: A sends Result<Message> which B receives as Message
3178 let (a_tx, b_rx) = mpsc::unbounded();
3179 let (b_tx, a_rx) = mpsc::unbounded();
3180
3181 let channel_a = Self { rx: a_rx, tx: a_tx };
3182 let channel_b = Self { rx: b_rx, tx: b_tx };
3183
3184 (channel_a, channel_b)
3185 }
3186
3187 /// Copy messages from `rx` to `tx`.
3188 ///
3189 /// # Returns
3190 ///
3191 /// A `Result` indicating success or failure.
3192 pub async fn copy(mut self) -> Result<(), crate::Error> {
3193 while let Some(msg) = self.rx.next().await {
3194 self.tx
3195 .unbounded_send(msg)
3196 .map_err(crate::util::internal_error)?;
3197 }
3198 Ok(())
3199 }
3200}
3201
3202impl<L: JrLink> Component<L> for Channel {
3203 async fn serve(self, client: impl Component<L::ConnectsTo>) -> Result<(), crate::Error> {
3204 let (client_channel, client_serve) = client.into_server();
3205
3206 match futures::try_join!(
3207 Channel {
3208 rx: client_channel.rx,
3209 tx: self.tx
3210 }
3211 .copy(),
3212 Channel {
3213 rx: self.rx,
3214 tx: client_channel.tx
3215 }
3216 .copy(),
3217 client_serve
3218 ) {
3219 Ok(((), (), ())) => Ok(()),
3220 Err(err) => Err(err),
3221 }
3222 }
3223
3224 fn into_server(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3225 (self, Box::pin(future::ready(Ok(()))))
3226 }
3227}