agent_client_protocol/stream_broadcast.rs
1//! JSON-RPC Stream broadcasting for debugging and monitoring communication.
2//!
3//! This module provides functionality to observe the JSON-RPC message stream between
4//! clients and agents. It's primarily used for debugging, logging, and building
5//! development tools that need to monitor the protocol communication.
6
7use std::sync::Arc;
8
9use agent_client_protocol_schema::Error;
10use anyhow::Result;
11use serde::Serialize;
12use serde_json::value::RawValue;
13
14use super::rpc::{Id, OutgoingMessage, ResponseResult, Side};
15
16/// A message that flows through the RPC stream.
17///
18/// This represents any JSON-RPC message (request, response, or notification)
19/// along with its direction (incoming or outgoing).
20///
21/// Stream messages are used for observing and debugging the protocol communication
22/// without interfering with the actual message handling.
23#[derive(Debug, Clone)]
24pub struct StreamMessage {
25 /// The direction of the message relative to this side of the connection.
26 pub direction: StreamMessageDirection,
27 /// The actual content of the message.
28 pub message: StreamMessageContent,
29}
30
31/// The direction of a message in the RPC stream.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum StreamMessageDirection {
34 /// A message received from the other side of the connection.
35 Incoming,
36 /// A message sent to the other side of the connection.
37 Outgoing,
38}
39
40/// The content of a stream message.
41///
42/// This enum represents the three types of JSON-RPC messages:
43/// - Requests: Method calls that expect a response
44/// - Responses: Replies to previous requests
45/// - Notifications: One-way messages that don't expect a response
46#[derive(Debug, Clone)]
47pub enum StreamMessageContent {
48 /// A JSON-RPC request message.
49 Request {
50 /// The unique identifier for this request.
51 id: Id,
52 /// The name of the method being called.
53 method: Arc<str>,
54 /// Optional parameters for the method.
55 params: Option<serde_json::Value>,
56 },
57 /// A JSON-RPC response message.
58 Response {
59 /// The ID of the request this response is for.
60 id: Id,
61 /// The result of the request (success or error).
62 result: Result<Option<serde_json::Value>, Error>,
63 },
64 /// A JSON-RPC notification message.
65 Notification {
66 /// The name of the notification method.
67 method: Arc<str>,
68 /// Optional parameters for the notification.
69 params: Option<serde_json::Value>,
70 },
71}
72
73/// A receiver for observing the message stream.
74///
75/// This allows you to receive copies of all messages flowing through the connection,
76/// useful for debugging, logging, or building development tools.
77///
78/// # Example
79///
80/// ```no_run
81/// use agent_client_protocol::{StreamReceiver, StreamMessageDirection};
82///
83/// async fn monitor_messages(mut receiver: StreamReceiver) {
84/// while let Ok(message) = receiver.recv().await {
85/// match message.direction {
86/// StreamMessageDirection::Incoming => println!("← Received: {:?}", message.message),
87/// StreamMessageDirection::Outgoing => println!("→ Sent: {:?}", message.message),
88/// }
89/// }
90/// }
91/// ```
92pub struct StreamReceiver(async_broadcast::Receiver<StreamMessage>);
93
94impl StreamReceiver {
95 /// Receives the next message from the stream.
96 ///
97 /// This method will wait until a message is available or the sender is dropped.
98 ///
99 /// # Returns
100 ///
101 /// - `Ok(StreamMessage)` when a message is received
102 /// - `Err` when the sender is dropped or the receiver is lagged
103 pub async fn recv(&mut self) -> Result<StreamMessage> {
104 Ok(self.0.recv().await?)
105 }
106}
107
108/// Internal sender for broadcasting stream messages.
109///
110/// This is used internally by the RPC system to broadcast messages to all receivers.
111/// You typically won't interact with this directly.
112pub(crate) struct StreamSender(async_broadcast::Sender<StreamMessage>);
113
114impl StreamSender {
115 /// Broadcasts an outgoing message to all receivers.
116 pub(crate) fn outgoing<L: Side, R: Side>(&self, message: &OutgoingMessage<L, R>) {
117 if self.0.receiver_count() == 0 {
118 return;
119 }
120
121 let message = StreamMessage {
122 direction: StreamMessageDirection::Outgoing,
123 message: match message {
124 OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
125 id: id.clone(),
126 method: method.clone(),
127 params: serde_json::to_value(params).ok(),
128 },
129 OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
130 id: id.clone(),
131 result: match result {
132 ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
133 ResponseResult::Error(error) => Err(error.clone()),
134 },
135 },
136 OutgoingMessage::Notification { method, params } => {
137 StreamMessageContent::Notification {
138 method: method.clone(),
139 params: serde_json::to_value(params).ok(),
140 }
141 }
142 },
143 };
144
145 self.0.try_broadcast(message).ok();
146 }
147
148 /// Broadcasts an incoming request to all receivers.
149 pub(crate) fn incoming_request(
150 &self,
151 id: Id,
152 method: impl Into<Arc<str>>,
153 params: &impl Serialize,
154 ) {
155 if self.0.receiver_count() == 0 {
156 return;
157 }
158
159 let message = StreamMessage {
160 direction: StreamMessageDirection::Incoming,
161 message: StreamMessageContent::Request {
162 id,
163 method: method.into(),
164 params: serde_json::to_value(params).ok(),
165 },
166 };
167
168 self.0.try_broadcast(message).ok();
169 }
170
171 /// Broadcasts an incoming response to all receivers.
172 pub(crate) fn incoming_response(&self, id: Id, result: Result<Option<&RawValue>, &Error>) {
173 if self.0.receiver_count() == 0 {
174 return;
175 }
176
177 let result = match result {
178 Ok(Some(value)) => Ok(serde_json::from_str(value.get()).ok()),
179 Ok(None) => Ok(None),
180 Err(err) => Err(err.clone()),
181 };
182
183 let message = StreamMessage {
184 direction: StreamMessageDirection::Incoming,
185 message: StreamMessageContent::Response { id, result },
186 };
187
188 self.0.try_broadcast(message).ok();
189 }
190
191 /// Broadcasts an incoming notification to all receivers.
192 pub(crate) fn incoming_notification(
193 &self,
194 method: impl Into<Arc<str>>,
195 params: &impl Serialize,
196 ) {
197 if self.0.receiver_count() == 0 {
198 return;
199 }
200
201 let message = StreamMessage {
202 direction: StreamMessageDirection::Incoming,
203 message: StreamMessageContent::Notification {
204 method: method.into(),
205 params: serde_json::to_value(params).ok(),
206 },
207 };
208
209 self.0.try_broadcast(message).ok();
210 }
211}
212
213/// A broadcast for observing RPC message streams.
214///
215/// This is used internally by the RPC connection to allow multiple receivers
216/// to observe the message stream.
217pub(crate) struct StreamBroadcast {
218 receiver: async_broadcast::InactiveReceiver<StreamMessage>,
219}
220
221impl StreamBroadcast {
222 /// Creates a new broadcast.
223 ///
224 /// Returns a sender for broadcasting messages and the broadcast instance
225 /// for creating receivers.
226 pub(crate) fn new() -> (StreamSender, Self) {
227 let (sender, receiver) = async_broadcast::broadcast(1);
228 (
229 StreamSender(sender),
230 Self {
231 receiver: receiver.deactivate(),
232 },
233 )
234 }
235
236 /// Creates a new receiver for observing the message stream.
237 ///
238 /// Each receiver will get its own copy of every message.
239 pub(crate) fn receiver(&self) -> StreamReceiver {
240 let was_empty = self.receiver.receiver_count() == 0;
241 let mut new_receiver = self.receiver.activate_cloned();
242 if was_empty {
243 // Grow capacity once we actually have a receiver
244 new_receiver.set_capacity(64);
245 }
246 StreamReceiver(new_receiver)
247 }
248}
249
250impl<Local: Side, Remote: Side> From<OutgoingMessage<Local, Remote>> for StreamMessage {
251 fn from(message: OutgoingMessage<Local, Remote>) -> Self {
252 Self {
253 direction: StreamMessageDirection::Outgoing,
254 message: match message {
255 OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
256 id,
257 method,
258 params: serde_json::to_value(params).ok(),
259 },
260 OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
261 id,
262 result: match result {
263 ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
264 ResponseResult::Error(error) => Err(error),
265 },
266 },
267 OutgoingMessage::Notification { method, params } => {
268 StreamMessageContent::Notification {
269 method,
270 params: serde_json::to_value(params).ok(),
271 }
272 }
273 },
274 }
275 }
276}