agent_client_protocol/stream_broadcast.rs
1//! JSON-RPC Stream broadcasting for debugging and monitoring communication.
2//!
3//! This module provides functionality to observe the JSON-RPC message stream between
4//! clients and agents. It's primarily used for debugging, logging, and building
5//! development tools that need to monitor the protocol communication.
6
7use std::sync::Arc;
8
9use anyhow::Result;
10use serde::Serialize;
11use serde_json::value::RawValue;
12
13use crate::{
14 Error,
15 rpc::{OutgoingMessage, ResponseResult, Side},
16};
17
18/// A message that flows through the RPC stream.
19///
20/// This represents any JSON-RPC message (request, response, or notification)
21/// along with its direction (incoming or outgoing).
22///
23/// Stream messages are used for observing and debugging the protocol communication
24/// without interfering with the actual message handling.
25#[derive(Debug, Clone)]
26pub struct StreamMessage {
27 /// The direction of the message relative to this side of the connection.
28 pub direction: StreamMessageDirection,
29 /// The actual content of the message.
30 pub message: StreamMessageContent,
31}
32
33/// The direction of a message in the RPC stream.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum StreamMessageDirection {
36 /// A message received from the other side of the connection.
37 Incoming,
38 /// A message sent to the other side of the connection.
39 Outgoing,
40}
41
42/// The content of a stream message.
43///
44/// This enum represents the three types of JSON-RPC messages:
45/// - Requests: Method calls that expect a response
46/// - Responses: Replies to previous requests
47/// - Notifications: One-way messages that don't expect a response
48#[derive(Debug, Clone)]
49pub enum StreamMessageContent {
50 /// A JSON-RPC request message.
51 Request {
52 /// The unique identifier for this request.
53 id: i32,
54 /// The name of the method being called.
55 method: Arc<str>,
56 /// Optional parameters for the method.
57 params: Option<serde_json::Value>,
58 },
59 /// A JSON-RPC response message.
60 Response {
61 /// The ID of the request this response is for.
62 id: i32,
63 /// The result of the request (success or error).
64 result: Result<Option<serde_json::Value>, Error>,
65 },
66 /// A JSON-RPC notification message.
67 Notification {
68 /// The name of the notification method.
69 method: Arc<str>,
70 /// Optional parameters for the notification.
71 params: Option<serde_json::Value>,
72 },
73}
74
75/// A receiver for observing the message stream.
76///
77/// This allows you to receive copies of all messages flowing through the connection,
78/// useful for debugging, logging, or building development tools.
79///
80/// # Example
81///
82/// ```no_run
83/// use agent_client_protocol::{StreamReceiver, StreamMessageDirection};
84///
85/// async fn monitor_messages(mut receiver: StreamReceiver) {
86/// while let Ok(message) = receiver.recv().await {
87/// match message.direction {
88/// StreamMessageDirection::Incoming => println!("← Received: {:?}", message.message),
89/// StreamMessageDirection::Outgoing => println!("→ Sent: {:?}", message.message),
90/// }
91/// }
92/// }
93/// ```
94pub struct StreamReceiver(async_broadcast::Receiver<StreamMessage>);
95
96impl StreamReceiver {
97 /// Receives the next message from the stream.
98 ///
99 /// This method will wait until a message is available or the sender is dropped.
100 ///
101 /// # Returns
102 ///
103 /// - `Ok(StreamMessage)` when a message is received
104 /// - `Err` when the sender is dropped or the receiver is lagged
105 pub async fn recv(&mut self) -> Result<StreamMessage> {
106 Ok(self.0.recv().await?)
107 }
108}
109
110/// Internal sender for broadcasting stream messages.
111///
112/// This is used internally by the RPC system to broadcast messages to all receivers.
113/// You typically won't interact with this directly.
114pub(crate) struct StreamSender(async_broadcast::Sender<StreamMessage>);
115
116impl StreamSender {
117 /// Broadcasts an outgoing message to all receivers.
118 pub(crate) fn outgoing<L: Side, R: Side>(&self, message: &OutgoingMessage<L, R>) {
119 if self.0.receiver_count() == 0 {
120 return;
121 }
122
123 let message = StreamMessage {
124 direction: StreamMessageDirection::Outgoing,
125 message: match message {
126 OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
127 id: *id,
128 method: method.clone(),
129 params: serde_json::to_value(params).ok(),
130 },
131 OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
132 id: *id,
133 result: match result {
134 ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
135 ResponseResult::Error(error) => Err(error.clone()),
136 },
137 },
138 OutgoingMessage::Notification { method, params } => {
139 StreamMessageContent::Notification {
140 method: method.clone(),
141 params: serde_json::to_value(params).ok(),
142 }
143 }
144 },
145 };
146
147 self.0.try_broadcast(message).ok();
148 }
149
150 /// Broadcasts an incoming request to all receivers.
151 pub(crate) fn incoming_request(
152 &self,
153 id: i32,
154 method: impl Into<Arc<str>>,
155 params: &impl Serialize,
156 ) {
157 if self.0.receiver_count() == 0 {
158 return;
159 }
160
161 let message = StreamMessage {
162 direction: StreamMessageDirection::Incoming,
163 message: StreamMessageContent::Request {
164 id,
165 method: method.into(),
166 params: serde_json::to_value(params).ok(),
167 },
168 };
169
170 self.0.try_broadcast(message).ok();
171 }
172
173 /// Broadcasts an incoming response to all receivers.
174 pub(crate) fn incoming_response(&self, id: i32, result: Result<Option<&RawValue>, &Error>) {
175 if self.0.receiver_count() == 0 {
176 return;
177 }
178
179 let result = match result {
180 Ok(Some(value)) => Ok(serde_json::from_str(value.get()).ok()),
181 Ok(None) => Ok(None),
182 Err(err) => Err(err.clone()),
183 };
184
185 let message = StreamMessage {
186 direction: StreamMessageDirection::Incoming,
187 message: StreamMessageContent::Response { id, result },
188 };
189
190 self.0.try_broadcast(message).ok();
191 }
192
193 /// Broadcasts an incoming notification to all receivers.
194 pub(crate) fn incoming_notification(
195 &self,
196 method: impl Into<Arc<str>>,
197 params: &impl Serialize,
198 ) {
199 if self.0.receiver_count() == 0 {
200 return;
201 }
202
203 let message = StreamMessage {
204 direction: StreamMessageDirection::Incoming,
205 message: StreamMessageContent::Notification {
206 method: method.into(),
207 params: serde_json::to_value(params).ok(),
208 },
209 };
210
211 self.0.try_broadcast(message).ok();
212 }
213}
214
215/// A broadcast for observing RPC message streams.
216///
217/// This is used internally by the RPC connection to allow multiple receivers
218/// to observe the message stream.
219pub(crate) struct StreamBroadcast {
220 receiver: async_broadcast::InactiveReceiver<StreamMessage>,
221}
222
223impl StreamBroadcast {
224 /// Creates a new broadcast.
225 ///
226 /// Returns a sender for broadcasting messages and the broadcast instance
227 /// for creating receivers.
228 pub(crate) fn new() -> (StreamSender, Self) {
229 let (sender, receiver) = async_broadcast::broadcast(1);
230 (
231 StreamSender(sender),
232 Self {
233 receiver: receiver.deactivate(),
234 },
235 )
236 }
237
238 /// Creates a new receiver for observing the message stream.
239 ///
240 /// Each receiver will get its own copy of every message.
241 pub(crate) fn receiver(&self) -> StreamReceiver {
242 let was_empty = self.receiver.receiver_count() == 0;
243 let mut new_receiver = self.receiver.activate_cloned();
244 if was_empty {
245 // Grow capacity once we actually have a receiver
246 new_receiver.set_capacity(64);
247 }
248 StreamReceiver(new_receiver)
249 }
250}
251
252impl<Local: Side, Remote: Side> From<OutgoingMessage<Local, Remote>> for StreamMessage {
253 fn from(message: OutgoingMessage<Local, Remote>) -> Self {
254 Self {
255 direction: StreamMessageDirection::Outgoing,
256 message: match message {
257 OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
258 id,
259 method,
260 params: serde_json::to_value(params).ok(),
261 },
262 OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
263 id,
264 result: match result {
265 ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
266 ResponseResult::Error(error) => Err(error),
267 },
268 },
269 OutgoingMessage::Notification { method, params } => {
270 StreamMessageContent::Notification {
271 method,
272 params: serde_json::to_value(params).ok(),
273 }
274 }
275 },
276 }
277 }
278}