agent_client_protocol/
stream_broadcast.rs

1//! JSON-RPC Stream broadcasting for debugging and monitoring communication.
2//!
3//! This module provides functionality to observe the JSON-RPC message stream between
4//! clients and agents. It's primarily used for debugging, logging, and building
5//! development tools that need to monitor the protocol communication.
6
7use std::sync::Arc;
8
9use agent_client_protocol_schema::Error;
10use anyhow::Result;
11use serde::Serialize;
12use serde_json::value::RawValue;
13
14use super::rpc::{Id, OutgoingMessage, ResponseResult, Side};
15
16/// A message that flows through the RPC stream.
17///
18/// This represents any JSON-RPC message (request, response, or notification)
19/// along with its direction (incoming or outgoing).
20///
21/// Stream messages are used for observing and debugging the protocol communication
22/// without interfering with the actual message handling.
23#[derive(Debug, Clone)]
24pub struct StreamMessage {
25    /// The direction of the message relative to this side of the connection.
26    pub direction: StreamMessageDirection,
27    /// The actual content of the message.
28    pub message: StreamMessageContent,
29}
30
31/// The direction of a message in the RPC stream.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum StreamMessageDirection {
34    /// A message received from the other side of the connection.
35    Incoming,
36    /// A message sent to the other side of the connection.
37    Outgoing,
38}
39
40/// The content of a stream message.
41///
42/// This enum represents the three types of JSON-RPC messages:
43/// - Requests: Method calls that expect a response
44/// - Responses: Replies to previous requests
45/// - Notifications: One-way messages that don't expect a response
46#[derive(Debug, Clone)]
47pub enum StreamMessageContent {
48    /// A JSON-RPC request message.
49    Request {
50        /// The unique identifier for this request.
51        id: Id,
52        /// The name of the method being called.
53        method: Arc<str>,
54        /// Optional parameters for the method.
55        params: Option<serde_json::Value>,
56    },
57    /// A JSON-RPC response message.
58    Response {
59        /// The ID of the request this response is for.
60        id: Id,
61        /// The result of the request (success or error).
62        result: Result<Option<serde_json::Value>, Error>,
63    },
64    /// A JSON-RPC notification message.
65    Notification {
66        /// The name of the notification method.
67        method: Arc<str>,
68        /// Optional parameters for the notification.
69        params: Option<serde_json::Value>,
70    },
71}
72
73/// A receiver for observing the message stream.
74///
75/// This allows you to receive copies of all messages flowing through the connection,
76/// useful for debugging, logging, or building development tools.
77///
78/// # Example
79///
80/// ```no_run
81/// use agent_client_protocol::{StreamReceiver, StreamMessageDirection};
82///
83/// async fn monitor_messages(mut receiver: StreamReceiver) {
84///     while let Ok(message) = receiver.recv().await {
85///         match message.direction {
86///             StreamMessageDirection::Incoming => println!("← Received: {:?}", message.message),
87///             StreamMessageDirection::Outgoing => println!("→ Sent: {:?}", message.message),
88///         }
89///     }
90/// }
91/// ```
92pub struct StreamReceiver(async_broadcast::Receiver<StreamMessage>);
93
94impl StreamReceiver {
95    /// Receives the next message from the stream.
96    ///
97    /// This method will wait until a message is available or the sender is dropped.
98    ///
99    /// # Returns
100    ///
101    /// - `Ok(StreamMessage)` when a message is received
102    /// - `Err` when the sender is dropped or the receiver is lagged
103    pub async fn recv(&mut self) -> Result<StreamMessage> {
104        Ok(self.0.recv().await?)
105    }
106}
107
108/// Internal sender for broadcasting stream messages.
109///
110/// This is used internally by the RPC system to broadcast messages to all receivers.
111/// You typically won't interact with this directly.
112pub(crate) struct StreamSender(async_broadcast::Sender<StreamMessage>);
113
114impl StreamSender {
115    /// Broadcasts an outgoing message to all receivers.
116    pub(crate) fn outgoing<L: Side, R: Side>(&self, message: &OutgoingMessage<L, R>) {
117        if self.0.receiver_count() == 0 {
118            return;
119        }
120
121        let message = StreamMessage {
122            direction: StreamMessageDirection::Outgoing,
123            message: match message {
124                OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
125                    id: id.clone(),
126                    method: method.clone(),
127                    params: serde_json::to_value(params).ok(),
128                },
129                OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
130                    id: id.clone(),
131                    result: match result {
132                        ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
133                        ResponseResult::Error(error) => Err(error.clone()),
134                    },
135                },
136                OutgoingMessage::Notification { method, params } => {
137                    StreamMessageContent::Notification {
138                        method: method.clone(),
139                        params: serde_json::to_value(params).ok(),
140                    }
141                }
142            },
143        };
144
145        self.0.try_broadcast(message).ok();
146    }
147
148    /// Broadcasts an incoming request to all receivers.
149    pub(crate) fn incoming_request(
150        &self,
151        id: Id,
152        method: impl Into<Arc<str>>,
153        params: &impl Serialize,
154    ) {
155        if self.0.receiver_count() == 0 {
156            return;
157        }
158
159        let message = StreamMessage {
160            direction: StreamMessageDirection::Incoming,
161            message: StreamMessageContent::Request {
162                id,
163                method: method.into(),
164                params: serde_json::to_value(params).ok(),
165            },
166        };
167
168        self.0.try_broadcast(message).ok();
169    }
170
171    /// Broadcasts an incoming response to all receivers.
172    pub(crate) fn incoming_response(&self, id: Id, result: Result<Option<&RawValue>, &Error>) {
173        if self.0.receiver_count() == 0 {
174            return;
175        }
176
177        let result = match result {
178            Ok(Some(value)) => Ok(serde_json::from_str(value.get()).ok()),
179            Ok(None) => Ok(None),
180            Err(err) => Err(err.clone()),
181        };
182
183        let message = StreamMessage {
184            direction: StreamMessageDirection::Incoming,
185            message: StreamMessageContent::Response { id, result },
186        };
187
188        self.0.try_broadcast(message).ok();
189    }
190
191    /// Broadcasts an incoming notification to all receivers.
192    pub(crate) fn incoming_notification(
193        &self,
194        method: impl Into<Arc<str>>,
195        params: &impl Serialize,
196    ) {
197        if self.0.receiver_count() == 0 {
198            return;
199        }
200
201        let message = StreamMessage {
202            direction: StreamMessageDirection::Incoming,
203            message: StreamMessageContent::Notification {
204                method: method.into(),
205                params: serde_json::to_value(params).ok(),
206            },
207        };
208
209        self.0.try_broadcast(message).ok();
210    }
211}
212
213/// A broadcast for observing RPC message streams.
214///
215/// This is used internally by the RPC connection to allow multiple receivers
216/// to observe the message stream.
217pub(crate) struct StreamBroadcast {
218    receiver: async_broadcast::InactiveReceiver<StreamMessage>,
219}
220
221impl StreamBroadcast {
222    /// Creates a new broadcast.
223    ///
224    /// Returns a sender for broadcasting messages and the broadcast instance
225    /// for creating receivers.
226    pub(crate) fn new() -> (StreamSender, Self) {
227        let (sender, receiver) = async_broadcast::broadcast(1);
228        (
229            StreamSender(sender),
230            Self {
231                receiver: receiver.deactivate(),
232            },
233        )
234    }
235
236    /// Creates a new receiver for observing the message stream.
237    ///
238    /// Each receiver will get its own copy of every message.
239    pub(crate) fn receiver(&self) -> StreamReceiver {
240        let was_empty = self.receiver.receiver_count() == 0;
241        let mut new_receiver = self.receiver.activate_cloned();
242        if was_empty {
243            // Grow capacity once we actually have a receiver
244            new_receiver.set_capacity(64);
245        }
246        StreamReceiver(new_receiver)
247    }
248}
249
250impl<Local: Side, Remote: Side> From<OutgoingMessage<Local, Remote>> for StreamMessage {
251    fn from(message: OutgoingMessage<Local, Remote>) -> Self {
252        Self {
253            direction: StreamMessageDirection::Outgoing,
254            message: match message {
255                OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
256                    id,
257                    method,
258                    params: serde_json::to_value(params).ok(),
259                },
260                OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
261                    id,
262                    result: match result {
263                        ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
264                        ResponseResult::Error(error) => Err(error),
265                    },
266                },
267                OutgoingMessage::Notification { method, params } => {
268                    StreamMessageContent::Notification {
269                        method,
270                        params: serde_json::to_value(params).ok(),
271                    }
272                }
273            },
274        }
275    }
276}