agent_client_protocol/
stream_broadcast.rs

1//! JSON-RPC Stream broadcasting for debugging and monitoring communication.
2//!
3//! This module provides functionality to observe the JSON-RPC message stream between
4//! clients and agents. It's primarily used for debugging, logging, and building
5//! development tools that need to monitor the protocol communication.
6
7use std::sync::Arc;
8
9use agent_client_protocol_schema::{
10    Error, Notification, OutgoingMessage, Request, RequestId, Response, Result, Side,
11};
12use derive_more::From;
13use serde::Serialize;
14use serde_json::value::RawValue;
15
16/// A message that flows through the RPC stream.
17///
18/// This represents any JSON-RPC message (request, response, or notification)
19/// along with its direction (incoming or outgoing).
20///
21/// Stream messages are used for observing and debugging the protocol communication
22/// without interfering with the actual message handling.
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct StreamMessage {
25    /// The direction of the message relative to this side of the connection.
26    pub direction: StreamMessageDirection,
27    /// The actual content of the message.
28    pub message: StreamMessageContent,
29}
30
31/// The direction of a message in the RPC stream.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum StreamMessageDirection {
34    /// A message received from the other side of the connection.
35    Incoming,
36    /// A message sent to the other side of the connection.
37    Outgoing,
38}
39
40/// The content of a stream message.
41///
42/// This enum represents the three types of JSON-RPC messages:
43/// - Requests: Method calls that expect a response
44/// - Responses: Replies to previous requests
45/// - Notifications: One-way messages that don't expect a response
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum StreamMessageContent {
48    /// A JSON-RPC request message.
49    Request {
50        /// The unique identifier for this request.
51        id: RequestId,
52        /// The name of the method being called.
53        method: Arc<str>,
54        /// Optional parameters for the method.
55        params: Option<serde_json::Value>,
56    },
57    /// A JSON-RPC response message.
58    Response {
59        /// The ID of the request this response is for.
60        id: RequestId,
61        /// The result of the request (success or error).
62        result: Result<Option<serde_json::Value>>,
63    },
64    /// A JSON-RPC notification message.
65    Notification {
66        /// The name of the notification method.
67        method: Arc<str>,
68        /// Optional parameters for the notification.
69        params: Option<serde_json::Value>,
70    },
71}
72
73/// A receiver for observing the message stream.
74///
75/// This allows you to receive copies of all messages flowing through the connection,
76/// useful for debugging, logging, or building development tools.
77///
78/// # Example
79///
80/// ```no_run
81/// use agent_client_protocol::{StreamReceiver, StreamMessageDirection};
82///
83/// async fn monitor_messages(mut receiver: StreamReceiver) {
84///     while let Ok(message) = receiver.recv().await {
85///         match message.direction {
86///             StreamMessageDirection::Incoming => println!("← Received: {:?}", message.message),
87///             StreamMessageDirection::Outgoing => println!("→ Sent: {:?}", message.message),
88///         }
89///     }
90/// }
91/// ```
92#[derive(Debug, From)]
93pub struct StreamReceiver(async_broadcast::Receiver<StreamMessage>);
94
95impl StreamReceiver {
96    /// Receives the next message from the stream.
97    ///
98    /// This method will wait until a message is available or the sender is dropped.
99    ///
100    /// # Returns
101    ///
102    /// - `Ok(StreamMessage)` when a message is received
103    /// - `Err` when the sender is dropped or the receiver is lagged
104    pub async fn recv(&mut self) -> Result<StreamMessage> {
105        self.0
106            .recv()
107            .await
108            .map_err(|e| Error::internal_error().data(e.to_string()))
109    }
110}
111
112/// Internal sender for broadcasting stream messages.
113///
114/// This is used internally by the RPC system to broadcast messages to all receivers.
115/// You typically won't interact with this directly.
116#[derive(Clone, Debug, From)]
117pub(crate) struct StreamSender(async_broadcast::Sender<StreamMessage>);
118
119impl StreamSender {
120    /// Broadcasts an outgoing message to all receivers.
121    pub(crate) fn outgoing<L: Side, R: Side>(&self, message: &OutgoingMessage<L, R>) {
122        if self.0.receiver_count() == 0 {
123            return;
124        }
125
126        let message = StreamMessage {
127            direction: StreamMessageDirection::Outgoing,
128            message: match message {
129                OutgoingMessage::Request(Request { id, method, params }) => {
130                    StreamMessageContent::Request {
131                        id: id.clone(),
132                        method: method.clone(),
133                        params: serde_json::to_value(params).ok(),
134                    }
135                }
136                OutgoingMessage::Response(Response::Result { id, result }) => {
137                    StreamMessageContent::Response {
138                        id: id.clone(),
139                        result: Ok(serde_json::to_value(result).ok()),
140                    }
141                }
142                OutgoingMessage::Response(Response::Error { id, error }) => {
143                    StreamMessageContent::Response {
144                        id: id.clone(),
145                        result: Err(error.clone()),
146                    }
147                }
148                OutgoingMessage::Notification(Notification { method, params }) => {
149                    StreamMessageContent::Notification {
150                        method: method.clone(),
151                        params: serde_json::to_value(params).ok(),
152                    }
153                }
154            },
155        };
156
157        self.0.try_broadcast(message).ok();
158    }
159
160    /// Broadcasts an incoming request to all receivers.
161    pub(crate) fn incoming_request(
162        &self,
163        id: RequestId,
164        method: impl Into<Arc<str>>,
165        params: &impl Serialize,
166    ) {
167        if self.0.receiver_count() == 0 {
168            return;
169        }
170
171        let message = StreamMessage {
172            direction: StreamMessageDirection::Incoming,
173            message: StreamMessageContent::Request {
174                id,
175                method: method.into(),
176                params: serde_json::to_value(params).ok(),
177            },
178        };
179
180        self.0.try_broadcast(message).ok();
181    }
182
183    /// Broadcasts an incoming response to all receivers.
184    pub(crate) fn incoming_response(
185        &self,
186        id: RequestId,
187        result: Result<Option<&RawValue>, &Error>,
188    ) {
189        if self.0.receiver_count() == 0 {
190            return;
191        }
192
193        let result = match result {
194            Ok(Some(value)) => Ok(serde_json::from_str(value.get()).ok()),
195            Ok(None) => Ok(None),
196            Err(err) => Err(err.clone()),
197        };
198
199        let message = StreamMessage {
200            direction: StreamMessageDirection::Incoming,
201            message: StreamMessageContent::Response { id, result },
202        };
203
204        self.0.try_broadcast(message).ok();
205    }
206
207    /// Broadcasts an incoming notification to all receivers.
208    pub(crate) fn incoming_notification(
209        &self,
210        method: impl Into<Arc<str>>,
211        params: &impl Serialize,
212    ) {
213        if self.0.receiver_count() == 0 {
214            return;
215        }
216
217        let message = StreamMessage {
218            direction: StreamMessageDirection::Incoming,
219            message: StreamMessageContent::Notification {
220                method: method.into(),
221                params: serde_json::to_value(params).ok(),
222            },
223        };
224
225        self.0.try_broadcast(message).ok();
226    }
227}
228
229/// A broadcast for observing RPC message streams.
230///
231/// This is used internally by the RPC connection to allow multiple receivers
232/// to observe the message stream.
233#[derive(Debug, Clone)]
234pub(crate) struct StreamBroadcast {
235    receiver: async_broadcast::InactiveReceiver<StreamMessage>,
236}
237
238impl StreamBroadcast {
239    /// Creates a new broadcast.
240    ///
241    /// Returns a sender for broadcasting messages and the broadcast instance
242    /// for creating receivers.
243    pub(crate) fn new() -> (StreamSender, Self) {
244        let (sender, receiver) = async_broadcast::broadcast(1);
245        (
246            sender.into(),
247            Self {
248                receiver: receiver.deactivate(),
249            },
250        )
251    }
252
253    /// Creates a new receiver for observing the message stream.
254    ///
255    /// Each receiver will get its own copy of every message.
256    pub(crate) fn receiver(&self) -> StreamReceiver {
257        let was_empty = self.receiver.receiver_count() == 0;
258        let mut new_receiver = self.receiver.activate_cloned();
259        if was_empty {
260            // Grow capacity once we actually have a receiver
261            new_receiver.set_capacity(64);
262        }
263        new_receiver.into()
264    }
265}
266
267impl<Local: Side, Remote: Side> From<OutgoingMessage<Local, Remote>> for StreamMessage {
268    fn from(message: OutgoingMessage<Local, Remote>) -> Self {
269        Self {
270            direction: StreamMessageDirection::Outgoing,
271            message: match message {
272                OutgoingMessage::Request(Request { id, method, params }) => {
273                    StreamMessageContent::Request {
274                        id,
275                        method,
276                        params: serde_json::to_value(params).ok(),
277                    }
278                }
279                OutgoingMessage::Response(Response::Result { id, result }) => {
280                    StreamMessageContent::Response {
281                        id,
282                        result: Ok(serde_json::to_value(result).ok()),
283                    }
284                }
285                OutgoingMessage::Response(Response::Error { id, error }) => {
286                    StreamMessageContent::Response {
287                        id,
288                        result: Err(error),
289                    }
290                }
291                OutgoingMessage::Notification(Notification { method, params }) => {
292                    StreamMessageContent::Notification {
293                        method,
294                        params: serde_json::to_value(params).ok(),
295                    }
296                }
297            },
298        }
299    }
300}