agent_client_protocol/stream_broadcast.rs
1//! JSON-RPC Stream broadcasting for debugging and monitoring communication.
2//!
3//! This module provides functionality to observe the JSON-RPC message stream between
4//! clients and agents. It's primarily used for debugging, logging, and building
5//! development tools that need to monitor the protocol communication.
6
7use std::sync::Arc;
8
9use agent_client_protocol_schema::{
10 Error, OutgoingMessage, RequestId, ResponseResult, Result, Side,
11};
12use derive_more::From;
13use serde::Serialize;
14use serde_json::value::RawValue;
15
16/// A message that flows through the RPC stream.
17///
18/// This represents any JSON-RPC message (request, response, or notification)
19/// along with its direction (incoming or outgoing).
20///
21/// Stream messages are used for observing and debugging the protocol communication
22/// without interfering with the actual message handling.
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct StreamMessage {
25 /// The direction of the message relative to this side of the connection.
26 pub direction: StreamMessageDirection,
27 /// The actual content of the message.
28 pub message: StreamMessageContent,
29}
30
31/// The direction of a message in the RPC stream.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum StreamMessageDirection {
34 /// A message received from the other side of the connection.
35 Incoming,
36 /// A message sent to the other side of the connection.
37 Outgoing,
38}
39
40/// The content of a stream message.
41///
42/// This enum represents the three types of JSON-RPC messages:
43/// - Requests: Method calls that expect a response
44/// - Responses: Replies to previous requests
45/// - Notifications: One-way messages that don't expect a response
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum StreamMessageContent {
48 /// A JSON-RPC request message.
49 Request {
50 /// The unique identifier for this request.
51 id: RequestId,
52 /// The name of the method being called.
53 method: Arc<str>,
54 /// Optional parameters for the method.
55 params: Option<serde_json::Value>,
56 },
57 /// A JSON-RPC response message.
58 Response {
59 /// The ID of the request this response is for.
60 id: RequestId,
61 /// The result of the request (success or error).
62 result: Result<Option<serde_json::Value>>,
63 },
64 /// A JSON-RPC notification message.
65 Notification {
66 /// The name of the notification method.
67 method: Arc<str>,
68 /// Optional parameters for the notification.
69 params: Option<serde_json::Value>,
70 },
71}
72
73/// A receiver for observing the message stream.
74///
75/// This allows you to receive copies of all messages flowing through the connection,
76/// useful for debugging, logging, or building development tools.
77///
78/// # Example
79///
80/// ```no_run
81/// use agent_client_protocol::{StreamReceiver, StreamMessageDirection};
82///
83/// async fn monitor_messages(mut receiver: StreamReceiver) {
84/// while let Ok(message) = receiver.recv().await {
85/// match message.direction {
86/// StreamMessageDirection::Incoming => println!("← Received: {:?}", message.message),
87/// StreamMessageDirection::Outgoing => println!("→ Sent: {:?}", message.message),
88/// }
89/// }
90/// }
91/// ```
92#[derive(Debug, From)]
93pub struct StreamReceiver(async_broadcast::Receiver<StreamMessage>);
94
95impl StreamReceiver {
96 /// Receives the next message from the stream.
97 ///
98 /// This method will wait until a message is available or the sender is dropped.
99 ///
100 /// # Returns
101 ///
102 /// - `Ok(StreamMessage)` when a message is received
103 /// - `Err` when the sender is dropped or the receiver is lagged
104 pub async fn recv(&mut self) -> Result<StreamMessage> {
105 self.0
106 .recv()
107 .await
108 .map_err(|e| Error::internal_error().with_data(e.to_string()))
109 }
110}
111
112/// Internal sender for broadcasting stream messages.
113///
114/// This is used internally by the RPC system to broadcast messages to all receivers.
115/// You typically won't interact with this directly.
116#[derive(Clone, Debug, From)]
117pub(crate) struct StreamSender(async_broadcast::Sender<StreamMessage>);
118
119impl StreamSender {
120 /// Broadcasts an outgoing message to all receivers.
121 pub(crate) fn outgoing<L: Side, R: Side>(&self, message: &OutgoingMessage<L, R>) {
122 if self.0.receiver_count() == 0 {
123 return;
124 }
125
126 let message = StreamMessage {
127 direction: StreamMessageDirection::Outgoing,
128 message: match message {
129 OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
130 id: id.clone(),
131 method: method.clone(),
132 params: serde_json::to_value(params).ok(),
133 },
134 OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
135 id: id.clone(),
136 result: match result {
137 ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
138 ResponseResult::Error(error) => Err(error.clone()),
139 },
140 },
141 OutgoingMessage::Notification { method, params } => {
142 StreamMessageContent::Notification {
143 method: method.clone(),
144 params: serde_json::to_value(params).ok(),
145 }
146 }
147 },
148 };
149
150 self.0.try_broadcast(message).ok();
151 }
152
153 /// Broadcasts an incoming request to all receivers.
154 pub(crate) fn incoming_request(
155 &self,
156 id: RequestId,
157 method: impl Into<Arc<str>>,
158 params: &impl Serialize,
159 ) {
160 if self.0.receiver_count() == 0 {
161 return;
162 }
163
164 let message = StreamMessage {
165 direction: StreamMessageDirection::Incoming,
166 message: StreamMessageContent::Request {
167 id,
168 method: method.into(),
169 params: serde_json::to_value(params).ok(),
170 },
171 };
172
173 self.0.try_broadcast(message).ok();
174 }
175
176 /// Broadcasts an incoming response to all receivers.
177 pub(crate) fn incoming_response(
178 &self,
179 id: RequestId,
180 result: Result<Option<&RawValue>, &Error>,
181 ) {
182 if self.0.receiver_count() == 0 {
183 return;
184 }
185
186 let result = match result {
187 Ok(Some(value)) => Ok(serde_json::from_str(value.get()).ok()),
188 Ok(None) => Ok(None),
189 Err(err) => Err(err.clone()),
190 };
191
192 let message = StreamMessage {
193 direction: StreamMessageDirection::Incoming,
194 message: StreamMessageContent::Response { id, result },
195 };
196
197 self.0.try_broadcast(message).ok();
198 }
199
200 /// Broadcasts an incoming notification to all receivers.
201 pub(crate) fn incoming_notification(
202 &self,
203 method: impl Into<Arc<str>>,
204 params: &impl Serialize,
205 ) {
206 if self.0.receiver_count() == 0 {
207 return;
208 }
209
210 let message = StreamMessage {
211 direction: StreamMessageDirection::Incoming,
212 message: StreamMessageContent::Notification {
213 method: method.into(),
214 params: serde_json::to_value(params).ok(),
215 },
216 };
217
218 self.0.try_broadcast(message).ok();
219 }
220}
221
222/// A broadcast for observing RPC message streams.
223///
224/// This is used internally by the RPC connection to allow multiple receivers
225/// to observe the message stream.
226#[derive(Debug, Clone)]
227pub(crate) struct StreamBroadcast {
228 receiver: async_broadcast::InactiveReceiver<StreamMessage>,
229}
230
231impl StreamBroadcast {
232 /// Creates a new broadcast.
233 ///
234 /// Returns a sender for broadcasting messages and the broadcast instance
235 /// for creating receivers.
236 pub(crate) fn new() -> (StreamSender, Self) {
237 let (sender, receiver) = async_broadcast::broadcast(1);
238 (
239 sender.into(),
240 Self {
241 receiver: receiver.deactivate(),
242 },
243 )
244 }
245
246 /// Creates a new receiver for observing the message stream.
247 ///
248 /// Each receiver will get its own copy of every message.
249 pub(crate) fn receiver(&self) -> StreamReceiver {
250 let was_empty = self.receiver.receiver_count() == 0;
251 let mut new_receiver = self.receiver.activate_cloned();
252 if was_empty {
253 // Grow capacity once we actually have a receiver
254 new_receiver.set_capacity(64);
255 }
256 new_receiver.into()
257 }
258}
259
260impl<Local: Side, Remote: Side> From<OutgoingMessage<Local, Remote>> for StreamMessage {
261 fn from(message: OutgoingMessage<Local, Remote>) -> Self {
262 Self {
263 direction: StreamMessageDirection::Outgoing,
264 message: match message {
265 OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
266 id,
267 method,
268 params: serde_json::to_value(params).ok(),
269 },
270 OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
271 id,
272 result: match result {
273 ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
274 ResponseResult::Error(error) => Err(error),
275 },
276 },
277 OutgoingMessage::Notification { method, params } => {
278 StreamMessageContent::Notification {
279 method,
280 params: serde_json::to_value(params).ok(),
281 }
282 }
283 },
284 }
285 }
286}